Published on

Building an LLM Chatbot with Django, Django Channels, DRF, and Langchain

6642 words34 min read
Authors

Overview

Good to see you again readers! In this walkthrough, I would like to share my experience and show how to implement a real time chatbot using well... you guessed it right using Django. Sigh 😅, I know what you are thinking, "Another Django based blog?" This time around for the chatbot to make it function just like ChatGPT or some of the typical live response streaming chat application, I am going to use Django Channels, Django RestFramework(DRF), LangChain and Huey. So let's get started.

What tools are we going to use?

  • Django: A high-level Python web framework that encourages rapid development and clean, pragmatic design.
  • Django Channels: Extends Django to handle WebSockets, chat protocols, IoT protocols, and more. It's required for real-time communication.
  • Django Rest Framework: A powerful and flexible toolkit for building Web APIs in Django. It's used to create RESTful APIs.
  • LangChain: A language model API that provides a simple interface to interact with large language models like GPT-3.
  • Huey: A lightweight task queue for Python that allows you to offload time-consuming tasks to the background. We will be using to run the LangChain API calls asynchronously.
  • Django Rest Framework Auth Token: Token-based authentication for Django Rest Framework. It's used to authenticate users in the API.

Setting up the Django Project

First, like typical protocol, let's install the required packages. I have been using poetry for managing my python packages. It's been really helpful in managing the dependencies and virtual environment. So first create you project directory and create a pyproject.toml file with the following content.

[tool.poetry]
name = "LangDjangoChat"
version = "0.1.0"
description = "The backend code for chatbot built using django framework"
authors = ["ananthanandanan <ananthanandanan@gmail.com>"]
readme = "README.md"
package-mode = false

[tool.poetry.dependencies]
python = "^3.12"
Django = "^5.0.6"
djangorestframework = "^3.15.1"
django-filter = "^24.2"
django-cors-headers = "^4.3.1"
redis = "^5.0.6"
channels = {extras = ["daphne"], version ="^4.1.0"}
channels-redis = "^4.2.0"
huey = "^2.5.1"
gevent = "^24.2.1"
pyjwt = "^2.8.0"
environ = "^1.0"


[tool.poetry.group.dev.dependencies]
ruff = "^0.4.8"

[tool.poetry.group.llm.dependencies]
langchain = "^0.2.1"
langchain-openai = "^0.1.8"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Now, let's install the packages using the following command.

poetry install

Once that is done, a new virtual environment will be created and the packages will be installed. Now, let's create a new Django project using the following command.

poetry run django-admin startproject framework

This will create a new Django project named framework, cd into the framework directory. Now, let's create a new Django app named chat using the following command.

poetry run python manage.py startapp chat

This will create a new Django app named chat. Now, let's add the required packages to the INSTALLED_APPS in the settings.py file.

INSTALLED_APPS = [
    ## websocket modules
    "daphne",
    "channels",
    "corsheaders", ## for cross origin requests
    ...
    ## installed app modules
    'chat',
    ## rest framework modules
    'rest_framework',
    'rest_framework.authtoken',
    ## huey modules
    'huey.contrib.djhuey',
]

Note: Make sure to add the daphne and channels packages to the INSTALLED_APPS at the top of the list. This is required for Django Channels to work properly.

Now, let's gets some basics out of the way.

What is Django Channels?

Django Channels is a project that extends Django to handle WebSockets, chat protocols, IoT protocols, and more. It allows Django to handle more than just plain HTTP requests, including WebSockets and HTTP2. Django Channels is required for real-time communication in Django.

Actually, right now I want all of you guys to take a break and go through the Django Channels documentation. It's really helpful and you will get a good understanding of how Django Channels works. They have a great tutorial that you can follow along. Django Channels Documentation

This is what I did when I started working with Django Channels. It's really helpful and you will get a good understanding of how Django Channels works. But don't worry, I will be explaining the important parts of Django Channels in this blog as well.

Okay, I am guessing you are back from the documentation. Let's me condense the information for you.

How Django Channels Works?

Just like how Django handles HTTP requests, Django Channels handles WebSockets and other protocols. It uses a combination of ASGI (Asynchronous Server Gateway Interface) and WebSockets to handle real-time communication. ASGI is a standard interface between web servers and Python applications that allows for asynchronous communication.

What are Consumers?

In Django Channels, consumers are Python classes that handle WebSockets and other protocols. Consumers are similar to views in Django, but they handle WebSockets and other protocols instead of HTTP requests. Consumers can receive messages, send messages, and perform other actions based on the incoming messages. They are the heart of Django Channels and are responsible for handling real-time communication. But unlike views, consumers are long-running and asynchronous.

What are Channels?

In Django Channels, channels are communication channels that connect clients to consumers. Channels are used to send messages between clients and consumers. Each channel has a unique name and is used to route messages to the appropriate consumer. Channels are the backbone of Django Channels and are responsible for handling real-time communication.

So basically a user connects to a channel and the consumer listens to the channel for any incoming messages. When a message is received, the consumer processes the message and sends a response back to the user. This is how real-time communication works in Django Channels.

But what if we want message to be send to multiple users?

Channel Layers

To solve this situation, Django Channels provides a feature called channel layers. Channel layers are a way to group channels together and send messages between them. Channel layers allow you to send messages to multiple consumers at the same time. You can have multiple channel layers in Django Channels, each with its own set of channels. Channel layers are used to broadcast messages to multiple consumers and handle complex real-time communication scenarios.

Now that we have a basic understanding of Django Channels, let's move on to the next step.

Setting up Django Channels

If you have followed the Django Channels documentation, you would have seen that setting up Django Channels is pretty straightforward. You just need to add the required packages to the INSTALLED_APPS in the settings.py file and configure the routing in the routing.py file. But I will walk you through the process. Since we have already added the required packages to the INSTALLED_APPS, let's move on to the next step.

First, let's setup the asgi.py file. This file is used to configure the ASGI application for Django Channels.

"""
ASGI config for framework project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/
"""
import os

from channels.routing import ProtocolTypeRouter
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        # Just HTTP for now. (We can add other protocols later.)
    }
)

In the above, asgi.py file, we are configuring the ASGI application for Django Channels. We are using the ProtocolTypeRouter to route HTTP requests to the Django ASGI application. This is the basic setup for Django Channels. We will be adding more configuration to the asgi.py file in the upcoming sections.

Next, set the ASGI_APPLICATION in the settings.py file.

ASGI_APPLICATION = "framework.asgi.application"

Adding the chat path to the urls

Now, let's add the chat path to the urls.py file in the framework directory.

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path("admin/", admin.site.urls),
    path("chat/", include("chat.urls")),
]

Once, that is done. If you run the Django server using poetry run python manage.py runserver, you would be able to see ASGI/Daphne server running on the console.

Setting up Consumers and Routing

Now, let's create a new file named consumers.py in the chat app directory. This file will contain the consumers for the chatbot. Consumers are Python classes that handle WebSockets and other protocols in Django Channels.

import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()

    async def disconnect(self, close_code):
        pass

    async def receive(self, text_data):

        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        await self.send(text_data=json.dumps({
            'message': message
        }))

For a consumer to work, it needs to have the following methods:

  • connect(): This method is called when the client connects to the consumer.
  • disconnect(): This method is called when the client disconnects from the consumer.
  • receive(): This method is called when the consumer receives a message from the client.
  • send(): This method is used to send a message to the client.
  • accept(): This method is used to accept the connection from the client.

In this case, we are using AsyncWebsocketConsumer which is an asynchronous consumer provided by Django Channels. This allows us to write asynchronous code in the consumer.

Note: The above code is just a skeleton for the consumer. We will be adding more functionality to the consumer in the upcoming sections.

Next, let's create a new file named routing.py in the chat app directory. This file will contain the routing configuration for the chatbot.

from django.urls import re_path
from . import consumers

## URL patterns for WebSocket routing.
websocket_urlpatterns = [
    re_path(r"ws/chat/(?P<thread_id>[\w-]+)/$", consumers.ChatConsumer.as_asgi()),
]

This file contains the URL patterns for WebSocket routing. In this case, we are using the re_path function to define a URL pattern for the chatbot. The URL pattern is ws/chat/<thread_id>/ where <thread_id> is a dynamic parameter that will be passed to the consumer.

Adding the routing configuration to the asgi

For channels to work, we need to add the routing configuration. We can bascially follow the Django Channels documentation to add the routing configuration. So to the asgi.py file add the following code.

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()

from chat.routing import websocket_urlpatterns

application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        "websocket": AllowedHostsOriginValidator(
            AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
        ),
    }
)

This code configures the ASGI application for Django Channels. It uses the ProtocolTypeRouter to route HTTP and WebSocket requests to the appropriate handlers. The AllowedHostsOriginValidator is used to validate the origin of WebSocket requests. The AuthMiddlewareStack is used to add authentication middleware to the WebSocket requests. The URLRouter is used to route WebSocket requests to the appropriate consumers.

From the Django Channels tutorial, if you still have the template files, you can use them to test the websockets.

Setting up Django Rest Framework

Now that we have setup the basic Django Channels, for a while let's switch to setting up the basic models, serializers and views for the Django Rest Framework. i.e. the REST API for the chatbot.

Chat Models

First, let's create a new file named models.py in the chat app directory. This file will contain the models for the chatbot.

from django.db import models
import uuid
from django.conf import settings


# Create your models here.
class ChatThread(models.Model):
    id = models.UUIDField(primary_key=True, editable=False, default=uuid.uuid4)
    name = models.TextField()
    author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    organization = models.TextField()

    class Meta:
        indexes = [
            models.Index(fields=["author", "organization"]),
        ]

    def __str__(self):
        return self.name


class ChatMessage(models.Model):
    SENDER_TYPES = [
        ("U", "User"),
        ("B", "Bot"),
    ]

    id = models.UUIDField(primary_key=True, editable=False, default=uuid.uuid4)
    thread = models.ForeignKey(ChatThread, on_delete=models.CASCADE)
    content = models.TextField()
    sender = models.CharField(max_length=1, choices=SENDER_TYPES, default="U")
    timestamp = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.content

This file contains the models for the chatbot. We have two models: ChatThread and ChatMessage. The ChatThread model represents a chat thread and has fields for the name, author, and organization. The ChatMessage model represents a chat message and has fields for the thread, content, sender, and timestamp.

Register the models in the admin.py file in the chat app directory.

from django.contrib import admin

from .models import ChatThread, ChatMessage

admin.site.register(ChatThread)
admin.site.register(ChatMessage)

Note: The ChatThread model has a foreign key to the Members model, which represents the members of the chat thread. This allows us to associate users with chat threads. Let's create the members model as well.

Members Model

I typically would like to create a new django app for the members model. So let's create a new app named members using the following command.

poetry run python manage.py startapp members

After following the typical protocol of adding the app to the INSTALLED_APPS in the settings.py file, let's create the models.py file in the members app directory.

from django.contrib.auth.models import (
    AbstractBaseUser,
    BaseUserManager,
    PermissionsMixin,
)
from django.db import models
import uuid


class MemberManager(BaseUserManager):
    def create_user(self, email, username, organization, password=None, **extra_fields):
        if not email:
            raise ValueError("The Email field must be set")
        email = self.normalize_email(email)
        user = self.model(
            email=email, username=username, organization=organization, **extra_fields
        )
        user.set_password(password)
        user.save(using=self._db)
        return user

    def create_superuser(
        self, email, username, organization, password=None, **extra_fields
    ):
        extra_fields.setdefault("is_staff", True)
        extra_fields.setdefault("is_superuser", True)

        if extra_fields.get("is_staff") is not True:
            raise ValueError("Superuser must have is_staff=True.")
        if extra_fields.get("is_superuser") is not True:
            raise ValueError("Superuser must have is_superuser=True.")

        return self.create_user(email, username, organization, password, **extra_fields)


class Members(AbstractBaseUser, PermissionsMixin):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    email = models.EmailField(unique=True)
    username = models.CharField(max_length=150, unique=True)
    organization = models.CharField(max_length=255)
    is_active = models.BooleanField(default=True)
    is_staff = models.BooleanField(default=False)
    date_joined = models.DateTimeField(auto_now_add=True)

    objects = MemberManager()

    USERNAME_FIELD = "email"
    REQUIRED_FIELDS = ["username", "organization"]

    class Meta:
        verbose_name = "Member"
        verbose_name_plural = "Members"

    def __str__(self):
        return self.email

This file contains the Members model which represents the members of the chatbot. The Members model is a custom user model that extends the AbstractBaseUser and PermissionsMixin models provided by Django. The Members model has fields for the email, username, organization, is_active, is_staff, and date_joined.

Don't forget to register the models in the admin.py file in the members app directory.

from django.contrib import admin

from .models import Members

admin.site.register(Members)

Don't forget to add the members to AUTH_USER_MODEL in the settings.py file.

AUTH_USER_MODEL = "members.Members"

Chat Serializers

Now, let's create the serializers for the models. Create a new file named serializers.py in the chat app directory.

from rest_framework import serializers
from .models import ChatMessage, ChatThread


class ChatThreadSerializer(serializers.ModelSerializer):
    class Meta:
        model = ChatThread
        fields = ["id", "name", "author", "organization"]


class ChatMessageSerializer(serializers.ModelSerializer):
    class Meta:
        model = ChatMessage
        fields = ["id", "thread", "content", "sender", "timestamp"]

This file contains the serializers for the models. The ChatThreadSerializer and ChatMessageSerializer classes are used to serialize the ChatThread and ChatMessage models, respectively.

Chat views and Authentication

Next, step is something I really wanted to setup, which was the Authentication for the REST API. I wanted to use the Django Rest Framework Auth Token for authentication. So that is why I added the rest_framework.authtoken to the INSTALLED_APPS in the settings.py file. We need to also add the authentication classes to the REST_FRAMEWORK settings in the settings.py file.

## Rest framework settings
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": [
        "rest_framework.authentication.TokenAuthentication",
    ],
    "DEFAULT_PERMISSION_CLASSES": [
        "rest_framework.permissions.IsAuthenticated",
    ],
}

This code configures the Django Rest Framework to use the TokenAuthentication class for authentication. The TokenAuthentication class uses token-based authentication to authenticate users in the API. The IsAuthenticated permission class is used to ensure that only authenticated users can access the API.

Now, let's create the views for the chatbot. Create a new file named views.py in the chat app directory.

from .models import ChatThread, ChatMessage
from .serializers import ChatThreadSerializer, ChatMessageSerializer
from rest_framework.permissions import IsAuthenticated

class ChatThreadViewSet(viewsets.ModelViewSet):
    serializer_class = ChatThreadSerializer
    permission_classes = [IsAuthenticated]

    def get_queryset(self):
        return ChatThread.objects.filter(author=self.request.user)


class ChatMessageViewSet(viewsets.ModelViewSet):
    serializer_class = ChatMessageSerializer
    permission_classes = [IsAuthenticated]

    def get_queryset(self):
        return ChatMessage.objects.filter(thread=self.kwargs["thread_id"])

This file contains the views for the chatbot. The ChatThreadViewSet and ChatMessageViewSet classes are used to create, retrieve, update, and delete chat threads and chat messages, respectively. The IsAuthenticated permission class is used to ensure that only authenticated users can access the views.

Chat URLs

Now, let's create the secure URLs for the chatbot. Create a new file named urls.py in the chat app directory.

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
from rest_framework.authtoken.views import obtain_auth_token

router = DefaultRouter(trailing_slash=False)

(
    router.register(
        "threads/(?P<thread_id>[^/.]+)/messages",
        views.ChatMessageViewSet,
        basename="chatmessage",
    ),
)
(router.register("threads", views.ChatThreadViewSet, basename="chatthread"),)

urlpatterns = [
    path("", include(router.urls)),
    path("api-token-auth/", obtain_auth_token, name="api_token_auth"),
]

This file contains the secure URLs for the chatbot. The router is used to register the views and create the URLs for the chat threads and chat messages. The obtain_auth_token view is used to obtain an authentication token for the user.

If you were able to follow along, you would have setup the basic REST API for the chatbot. Now, let's move on to the next step.

This is where I spent the most time racking my head, which to create Custom Auth Stack for Django Channels using the DRF TokenAuthentication, but after reading through different posts in both DRF and Django Channels I setup a decent solution.

Setting up Custom Auth Stack for Django Channels

Remember that we are creating an authenticated chatbot, which means that we would need to authenticate and get the user details from the Django Rest Framework. So let's create a custom authentication middleware for Django Channels.

Create a new file named middleware.py in the chat app directory.

from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
from channels.auth import AuthMiddlewareStack


@database_sync_to_async
def get_user(token_key):
    """Get the user associated with the given token key.

    Args:
        token_key (str): The key of the token to retrieve.

    Returns:
        User: The user associated with the token, or an instance of
            `AnonymousUser` if the token is invalid.
    """
    try:
        token = Token.objects.get(key=token_key)
        return token.user
    except Token.DoesNotExist:
        return AnonymousUser()


class TokenAuthMiddleware(BaseMiddleware):
    """Custom token authentication middleware.

    This middleware checks for a token in the query string of the request and
    attempts to authenticate the user with the token. If the token is invalid,
    the user will be set to an instance of `AnonymousUser`.

    NOTE: This middleware should be placed before `AuthMiddlewareStack`. This
    is because `AuthMiddlewareStack` will attempt to authenticate the user
    based on the session data, which is not available for WebSocket
    connections. By placing this middleware first, we can ensure that the user
    is authenticated based on the token before `AuthMiddlewareStack` is
    invoked.

    Reference: https://www.django-rest-framework.org/api-guide/authentication/#tokenauthentication
    """

    async def __call__(self, scope, receive, send):
        try:
            token_key = dict(
                (x.split("=") for x in scope["query_string"].decode().split("&"))
            ).get("token", None)
        except ValueError:
            token_key = None
        scope["user"] = (
            AnonymousUser() if token_key is None else await get_user(token_key)
        )
        return await super().__call__(scope, receive, send)


def TokenAuthMiddlewareStack(inner):
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

So what the hell is happening here? The TokenAuthMiddleware class is a custom token authentication middleware for Django Channels. This middleware checks for a token in the query string of the request and attempts to authenticate the user with the token. If the token is invalid, the user will be set to an instance of AnonymousUser.

The get_user function is a helper function that retrieves the user associated with the given token key. If the token is valid, the user associated with the token is returned. If the token is invalid, an instance of AnonymousUser is returned.

The TokenAuthMiddlewareStack function is a helper function that wraps the TokenAuthMiddleware middleware with the AuthMiddlewareStack middleware. This ensures that the user is authenticated based on the token before the AuthMiddlewareStack middleware is invoked.

Now, let's add the custom authentication middleware to the ASGI application in the asgi.py file.

"""
ASGI config for framework project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/
"""

import os

from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter

from channels.security.websocket import AllowedHostsOriginValidator
from chat.middleware import TokenAuthMiddlewareStack

from chat.routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")

django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        "websocket": AllowedHostsOriginValidator(
            TokenAuthMiddlewareStack(URLRouter(websocket_urlpatterns))
        ),
    }
)

Basically instead of using the AuthMiddlewareStack we are using the TokenAuthMiddlewareStack which we created in the middleware.py file. What this means that the user will be authenticated based on the token before the AuthMiddlewareStack is invoked. And if you know about Channels, there are three attributes -> scope, receive, send. The scope is a dictionary that contains information about the connection, such as the path, query string, and headers. The receive function is used to receive messages from the client. The send function is used to send messages to the client.

Now, it's time to really shape up the chatbot.

Setting up LangChain

Before we move on to implementing the ChatConsumer, let's setup the LangChain API. LangChain is a language model API that provides a simple interface to interact with large language models like GPT-4. I have been using LangChain for a while now and it's been really helpful in building chatbots

We had already installed the langchain and langchain-openai packages in the initial setup. Now, let's create a directory under chat called chatbot and create a new file named app.py.

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai.chat_models import AzureChatOpenAI
import os

os.environ["OPEN_API_TYPE"] = "azure"
os.environ["AZURE_OPENAI_API_KEY"] = "YOUR_AZURE_OPENAI_API_KEY"
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://<key>.openai.azure.com"
os.environ["OPENAI_API_VERSION"] = "2023-05-15"

llm = AzureChatOpenAI(
    deployment_name="gpt-35-turbo",
    model_name="gpt-35-turbo",
    temperature=0,
    streaming=True,
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a AI Chatbot."),
        ("user", "{input}"),
    ]
)

chain = prompt | llm

This file contains the LangChain setup for the chatbot. The AzureChatOpenAI class is used to interact with the Azure OpenAI API. The llm object is created with the deployment name, model name, temperature, and streaming parameters. The prompt object is created with the chat prompt template. The chain object is created by combining the prompt and the llm object.

Running LangChain as a background task

To make the experience seamless, we need to run the LangChain API calls asynchronously. For that we will be using the Huey package. Huey is a lightweight task queue for Python that allows you to offload time-consuming tasks to the background.

Again, since we had already installed the huey package in the initial setup, let's begin by setting the HUEY settings in the settings.py file.

## Huey settings
HUEY = {
    "huey_class": "huey.RedisHuey",
    "name": "framework",
    "connection": {"host": "localhost", "port": 6379, "db": 1},
    "immediate": False,
    "immediate_use_memory": False,
    "consumer": {
        "workers": 4,
        "worker_type": "greenlet",
        "loglevel": logging.DEBUG,
    },
}

Setup Gevent for Huey worker

Since we are using greenlet as the worker type, we need to setup Gevent as well. Gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop. For Django we need to do a monkey patching to make it work. In the manage.py file add the following code.

#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""

import os
import sys

# Apply monkey-patch if we are running the huey consumer.
if "run_huey" in sys.argv:
    from gevent import monkey

    monkey.patch_all()


def main():
    """Run administrative tasks."""
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
    try:
        from django.core.management import execute_from_command_line
    except ImportError as exc:
        raise ImportError(
            "Couldn't import Django. Are you sure it's installed and "
            "available on your PYTHONPATH environment variable? Did you "
            "forget to activate a virtual environment?"
        ) from exc
    execute_from_command_line(sys.argv)


if __name__ == "__main__":
    main()

Before we setup the tasks.py let's completely setup the ChatConsumer.

Channel Layer setup

The channel layer is used to group channels together and send messages between them. Which basically means that, it's using the channel layers are the additional layer of abstraction that allows us to send messages between different channels.

For that we need to setup CHANNEL_LAYERS in the settings.py file.

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

Note: Make sure to install Redis and run the Redis server before running the Django server. For my ease of use, I ran the Redis server using Docker.

Setting up the ChatConsumer

Before, I show you the ChatConsumer, let me explain my thought process. So we have the ChatThread and ChatMessage models. The user flow would be:

  1. The user signs up and logs in. Or If the user is already registered, the user logs in.
  2. The user creates a new chat thread, or joins an existing chat thread.
  3. The user sends a message to the chat thread.
  4. The LLM model processes the message and sends a response back to the user.
  5. These messages are stored in the ChatMessage model and the thread_id is also stored.
  6. In addition, since the data is stored, the user can view the chat history.

With that in mind, let's update the ChatConsumer.

import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from .models import ChatThread, ChatMessage
from members.models import Members
from .tasks import handle_chat_message


class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.thread_id = self.scope["url_route"]["kwargs"]["thread_id"]
        self.user = self.scope["user"]

        if not self.user.is_authenticated:
            await self.close()
            return

        self.thread = await self.get_or_create_thread(self.thread_id, self.user)
        await self.channel_layer.group_add(self.thread_id, self.channel_name)
        await self.accept()
        await self.send_chat_logs()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.thread_id, self.channel_name)

    async def receive(self, text_data=None, bytes_data=None):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        chat_message = await self.save_message(self.thread, self.user, message)
        await self.channel_layer.group_send(
            self.thread_id,
            {
                "type": "chat.message",
                "message": chat_message.content,
                "sender": chat_message.sender,
                "timestamp": chat_message.timestamp.isoformat(),
                "category": "user_message",
            },
        )
        handle_chat_message(self.thread_id, message)

    async def chat_message(self, event):
        category = event.get("category")
        timestamp = event.get("timestamp")
        stream_id = event.get("stream_id")
        chunk = event.get("chunk")
        response = event.get("response")
        message = event.get("message")
        sender = event.get("sender", "Bot")  # Default sender to "Bot"

        if category == "stream_start":
            await self.send(
                text_data=json.dumps(
                    {
                        "message": "",
                        "sender": sender,
                        "timestamp": timestamp,
                        "stream_id": stream_id,
                        "category": "stream_start",
                    }
                )
            )
        elif category == "stream_chunk":
            await self.send(
                text_data=json.dumps(
                    {
                        "message": chunk,
                        "sender": sender,
                        "timestamp": timestamp,
                        "stream_id": stream_id,
                        "category": "stream_chunk",
                    }
                )
            )
        elif category == "stream_end":
            _ = await self.save_message(self.thread, "B", response)
            await self.send(
                text_data=json.dumps(
                    {
                        "message": response,
                        "sender": sender,
                        "timestamp": timestamp,
                        "stream_id": stream_id,
                        "category": "stream_end",
                    }
                )
            )
        else:
            await self.send(
                text_data=json.dumps(
                    {
                        "message": message,
                        "sender": sender,
                        "timestamp": timestamp,
                        "category": category,
                    }
                )
            )

    async def send_chat_logs(self):
        messages = await self.get_chat_messages()
        for message in messages:
            await self.send(
                text_data=json.dumps(
                    {
                        "message": message.content,
                        "sender": message.sender,
                        "timestamp": message.timestamp.isoformat(),
                        "category": "chat_log",
                    }
                )
            )

    @database_sync_to_async
    def get_or_create_thread(self, thread_id, user):
        user = Members.objects.get(email=user.email)
        thread, created = ChatThread.objects.get_or_create(
            id=thread_id,
            defaults={
                "name": f"Chat {thread_id}",
                "author": user,
                "organization": user.organization,
            },
        )
        return thread

    @database_sync_to_async
    def save_message(self, thread, user, content):
        if isinstance(user, Members):
            chat_message = ChatMessage.objects.create(
                thread=thread, content=content, sender="U"
            )
        else:
            chat_message = ChatMessage.objects.create(
                thread=thread, content=content, sender="B"
            )
        return chat_message

    @database_sync_to_async
    def get_chat_messages(self):
        return list(
            ChatMessage.objects.filter(thread=self.thread).order_by("timestamp")
        )

What is happening in the ChatConsumer?

The ChatConsumer class extends the AsyncWebsocketConsumer class provided by Django Channels. The ChatConsumer class is responsible for handling WebSocket connections and messages for the chatbot. Let's break down the methods in the ChatConsumer class:

  1. The connect method is called when the client connects to the consumer. In this method, we get the thread_id and user from the scope, check if the user is authenticated, get or create the chat thread, add the channel to the group, and accept the connection. We also send the chat logs to the user.

  2. The disconnect method is called when the client disconnects from the consumer. In this method, we remove the channel from the group.

  3. The receive method is called when the consumer receives a message from the client. In this method, we save the message to the chat thread, send the message to the group, and handle the message using the handle_chat_message task. I will elaborate on this task in the next section.

  4. The chat_message method is called when the consumer receives a message from the group. In this method, we send the message to the user.

  5. The send_chat_logs method is used to send the chat logs to the user.

  6. The get_or_create_thread method is used to get or create the chat thread.

  7. The save_message method is used to save the message to the chat thread.

  8. The get_chat_messages method is used to get the chat messages from the chat thread.

Note: The decorator @database_sync_to_async is used to convert synchronous database operations to asynchronous operations. This allows us to perform database operations asynchronously in the consumer. A event hanlder method like chat_message is called using the type key in the group_send method. The category key is used to determine the type of message. I will explain how this is implemented in the frontend in the next section.

Setting up the Huey Task

Now, let's create the tasks.py file in the chat app directory.

from huey.contrib.djhuey import task
import logging
import uuid
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from chat.chatbot import app as chat_app
from django.utils import timezone
import logging


async def stream_response(thread_id, query):
    runnable = chat_app.chain
    stream_id = str(uuid.uuid4())
    channel_layer = get_channel_layer()

    def send(message):
        return channel_layer.group_send(thread_id, message)

    await send(
        {
            "type": "chat.message",
            "category": "stream_start",
            "stream_id": stream_id,
            "timestamp": timezone.now().isoformat(),  # Adding a unique timestamp
        }
    )

    response = ""
    async for chunk in runnable.astream({"input": query}):
        response += chunk.content
        logging.warning(f"Stream chunk: {chunk.content}")
        await send(
            {
                "type": "chat.message",
                "category": "stream_chunk",
                "stream_id": stream_id,
                "chunk": chunk.content,
                "timestamp": timezone.now().isoformat(),  # Adding a unique timestamp
            }
        )
    logging.warning(f"Stream response: {response}")
    await send(
        {
            "type": "chat.message",
            "category": "stream_end",
            "stream_id": stream_id,
            "response": response,
            "timestamp": timezone.now().isoformat(),  # Adding a unique timestamp
        }
    )


@task()
def handle_chat_message(thread_id, query):
    logging.warning(f"Handling chat message {thread_id} {query}")

    async_to_sync(stream_response)(thread_id, query)

What is happening in the tasks.py file?

Here, I am using the category key to determine the type of message. The stream_start category is used to indicate the start of the stream. The stream_chunk category is used to indicate a chunk of the stream. The stream_end category is used to indicate the end of the stream. The response key is used to store the response from the LLM model.

The stream_response function is used to stream the response from the LLM model to the user. The runnable object is created with the LangChain chain. The stream_id is generated with a unique identifier. The channel_layer is used to send messages to the group. The send function is used to send messages to the group. The stream_start message is sent to the group to indicate the start of the stream. The response is accumulated in the response variable. The stream_chunk message is sent to the group to indicate a chunk of the stream. The stream_end message is sent to the group to indicate the end of the stream.

Note: The @task() decorator is used to create a Huey task. The handle_chat_message task is used to handle the chat message. The stream_response function is called asynchronously in the task.

The async_to_sync function is used to convert an asynchronous function to a synchronous function. This allows us to call asynchronous functions in synchronous code. We require this because the stream_response function is an asynchronous function, but the handle_chat_message task is a synchronous function.

the get_channel_layer function is used to get the channel layer. The group_send method is used to send messages to the group.

Bring together all the pieces into "View".

Let's write the logic, that will bring together all the pieces we have setup so far. In the views.py file in the chat app directory, add the following code.

Creating members creation form

In django, we can create a form using the ModelForm class. Let's create a form for the Members model. Create a new file named forms.py in the members app directory.

from django import forms
from django.contrib.auth.forms import UserCreationForm
from .models import Members


class MembersCreationForm(UserCreationForm):
    class Meta:
        model = Members
        fields = ("username", "email", "organization", "password1", "password2")

This file contains the MembersCreationForm class, which is a form for creating new members. The form is based on the UserCreationForm class provided by Django. The form has fields for the username, email, organization, and password.

Register and Login Views

from django.views import View
from members.forms import MembersCreationForm
from rest_framework.authtoken.models import Token

## TODO: Convert the register and login views to use Django REST Framework API views for more production-ready code
class RegisterView(View):
    def get(self, request):
        form = MembersCreationForm()
        return render(request, "chat/register.html", {"form": form})

    def post(self, request):
        form = MembersCreationForm(request.POST)
        if form.is_valid():
            user = form.save()
            token, created = Token.objects.get_or_create(user=user)
            login(request, user)
            return redirect("chatroom")
        return render(request, "chat/register.html", {"form": form})


class LoginView(View):
    def get(self, request):
        return render(request, "chat/login.html")

    def post(self, request):
        email = request.POST["email"]
        password = request.POST["password"]
        user = authenticate(request, email=email, password=password)
        if user is not None:
            login(request, user)
            token, created = Token.objects.get_or_create(user=user)
            return redirect("chatroom")
        return render(request, "chat/login.html", {"error": "Invalid credentials"})


def chat(request):
    token, created = Token.objects.get_or_create(user=request.user)
    return render(request, "chat/chat.html", {"token": token.key})

This file contains the views for the chatbot. The RegisterView and LoginView classes are used to register and login users, respectively. The chat function is used to render the chat room template with the authentication token.

Update the urls.py file in the chat app directory with the following code.

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from rest_framework.authtoken.views import obtain_auth_token
from . import views

router = DefaultRouter(trailing_slash=False)

(
    router.register(
        "threads/(?P<thread_id>[^/.]+)/messages",
        views.ChatMessageViewSet,
        basename="chatmessage",
    ),
)
(router.register("threads", views.ChatThreadViewSet, basename="chatthread"),)

urlpatterns = [
    path("", include(router.urls)),
    ## NOTE: API endpoint to get the auth token
    path("api-token-auth/", obtain_auth_token, name="api_token_auth"),
    path("register/", views.RegisterView.as_view(), name="register"),
    path("login/", views.LoginView.as_view(), name="login"),
    path("chatroom/", views.chat, name="chatroom"),
]

Setting up the Frontend

Now, let's setup the frontend for the chatbot. Create a new directory named templates in the chat app directory. In the templates directory, create a new directory named chat. In the chat directory, create a new file named login.html, register.html, and chat.html.

Login page

<!DOCTYPE html>
<html>
  <head>
    <title>Login</title>
  </head>
  <body>
    <h2>Login</h2>
    <form method="post">
      {% csrf_token %}
      <input type="email" name="email" placeholder="Email" required />
      <input type="password" name="password" placeholder="Password" required />
      <button type="submit">Login</button>
    </form>
    {% if error %}
    <p style="color: red">{{ error }}</p>
    {% endif %}
  </body>
</html>

Register page

<!DOCTYPE html>
<html>
  <head>
    <title>Register</title>
  </head>
  <body>
    <h2>Register</h2>
    <form method="post">
      {% csrf_token %} {{ form.as_p }}
      <button type="submit">Register</button>
    </form>
  </body>
</html>

Chat page

<!DOCTYPE html>
<html>
  <head>
    <title>Chat</title>
  </head>
  <body>
    <h2>Chat Room</h2>
    <div id="thread-section">
      <input type="text" id="thread-id" placeholder="Thread ID" />
      <button onclick="joinThread()">Join Thread</button>
      <button onclick="createThread()">Create Thread</button>
    </div>
    <div id="chat-section" style="display: none">
      <div id="messages" style="border: 1px solid #ccc; height: 300px; overflow-y: scroll"></div>
      <input type="text" id="message-input" placeholder="Type a message" />
      <button onclick="sendMessage()">Send</button>
    </div>
    <script>
      let threadId
      let socket
      const token = '{{ token }}' // Get the token from the context
      const ongoingMessages = {} // Object to keep track of ongoing messages by stream_id

      function joinThread() {
        threadId = document.getElementById('thread-id').value
        if (threadId) {
          document.getElementById('thread-section').style.display = 'none'
          document.getElementById('chat-section').style.display = 'block'
          connectWebSocket()
        }
      }

      function createThread() {
        threadId = document.getElementById('thread-id').value || generateUUID() // Use input value or generate a new unique thread ID
        document.getElementById('thread-section').style.display = 'none'
        document.getElementById('chat-section').style.display = 'block'
        connectWebSocket()
      }

      function connectWebSocket() {
        socket = new WebSocket(`ws://${window.location.host}/ws/chat/${threadId}/?token=${token}`)

        socket.onmessage = function (event) {
          const data = JSON.parse(event.data)
          console.log('Received data:', data) // Debugging line
          const messages = document.getElementById('messages')

          // If this is a user message, display it immediately
          if (data.category === 'user_message') {
            const message = document.createElement('div')
            message.textContent = `${data.timestamp} - ${data.sender}: ${data.message}`
            messages.appendChild(message)
          }

          // Handle chat log messages
          if (data.category === 'chat_log') {
            const message = document.createElement('div')
            message.textContent = `${data.timestamp} - ${data.sender}: ${data.message}`
            messages.appendChild(message)
          }

          // If this is the start of a new stream, create a new message element
          if (data.category === 'stream_start') {
            ongoingMessages[data.stream_id] = document.createElement('div')
            ongoingMessages[data.stream_id].textContent = `${data.timestamp} - ${data.sender}: `
            messages.appendChild(ongoingMessages[data.stream_id])
          }

          // Append chunks to the ongoing message
          if (data.category === 'stream_chunk') {
            ongoingMessages[data.stream_id].textContent += data.message
          }

          // Finalize the message when the stream ends
          if (data.category === 'stream_end') {
            delete ongoingMessages[data.stream_id]
          }

          messages.scrollTop = messages.scrollHeight // Scroll to bottom
        }

        socket.onclose = function (event) {
          console.error('WebSocket closed unexpectedly')
        }
      }

      function sendMessage() {
        const input = document.getElementById('message-input')
        const message = input.value
        socket.send(JSON.stringify({ message: message }))
        input.value = ''
      }

      function generateUUID() {
        return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
          const r = (Math.random() * 16) | 0,
            v = c === 'x' ? r : (r & 0x3) | 0x8
          return v.toString(16)
        })
      }
    </script>
  </body>
</html>

What is happening in the chat.html file?

This might look like a lot of code, but it's actually quite simple. The chat.html file contains the HTML and JavaScript code for the chat room. The thread-section div contains input fields for the thread ID and buttons to join or create a thread. The chat-section div contains the chat messages, input field for the message, and button to send the message.

The joinThread function is called when the user clicks the join thread button. In this function, we get the thread ID from the input field, hide the thread section, show the chat section, and connect to the WebSocket.

The createThread function is called when the user clicks the create thread button. In this function, we get the thread ID from the input field or generate a new unique thread ID, hide the thread section, show the chat section, and connect to the WebSocket.

The connectWebSocket function is used to connect to the WebSocket. In this function, we create a new WebSocket connection with the thread ID and token. We handle the onmessage and onclose events of the WebSocket.

The implementation that make it all lies in the onmessage event. The onmessage event is called when the WebSocket receives a message from the server. In this event, we parse the message data, log the data to the console, and display the message in the chat room. We handle different categories of messages, such as user messages, chat log messages, stream start messages, stream chunk messages, and stream end messages. This is where the magic happens. We are handling the stream of messages and appending based on the stream_id and category of the message in such a way that it would feel like a real chatbot.

The sendMessage function is called when the user clicks the send button. In this function, we get the message from the input field, send the message to the server using the WebSocket, and clear the input field.

The generateUUID function is used to generate a new unique thread ID. This was a function took from ChatGPT LOL.

How to run the Chatbot

Now, let the fun begin. Run the Django server using the following command.

poetry run python manage.py runserver

Then on a new terminal, run the Huey consumer using the following command.

poetry run python manage.py run_huey

Now, open your browser and go to http://localhost:8000/register/. Register a new user and login. Then go to http://localhost:8000/chatroom/. Enter a thread ID and click the create thread button. You could see an empty chat room. Now, open another browser tab and go to http://localhost:8000/chatroom/. Enter the same thread ID and click the join thread button. You could see the chat room in both tabs. Now, you can send messages in one tab and see the responses in the other tab. When you rejoin the chat room, you could see the chat history. And if you are not authenticated, you would be redirected to the login page.

Wow, now we have a chatbot that can interact with users in real-time. This is a basic implementation of a chatbot using Django Channels, Django Rest Framework, LangChain, and Huey. You can extend this chatbot by adding more features. The frontend here is just html and javascript, you can use any frontend framework like React, Vue, or Angular to build a more interactive chatbot. The steps to write the web socket in the frontend doesn't change much, you just need to connect to the websocket and handle the messages.

Conclusion

In this walkthrough, we covered quite a lot of ground. We started by setting up the Django project and creating the models, serializers, views, and URLs for the chatbot. We then created a custom authentication middleware for Django Channels and set up the channel layer for the chatbot. We also set up the LangChain API and Huey task for the chatbot. Finally, we created the frontend for the chatbot and brought together all the pieces to build a real-time chatbot.

I hope you found this walkthrough helpful and that you learned something new.

References

  1. Django Channels Documentation
  2. Django Rest Framework Documentation
  3. LangChain Documentation
  4. Huey Documentation
  5. Channels Auth Middleware
  6. Django Huey
  7. Channel Layer