- Published on
Building an LLM Chatbot with Django, Django Channels, DRF, and Langchain
- Authors
- Name
- K N Anantha nandanan
- @Ananthan2k
Overview
Good to see you again readers! In this walkthrough, I would like to share my experience and show how to implement a real time chatbot using well... you guessed it right using Django. Sigh 😅, I know what you are thinking, "Another Django based blog?" This time around for the chatbot to make it function just like ChatGPT or some of the typical live response streaming chat application, I am going to use Django Channels, Django RestFramework(DRF), LangChain and Huey. So let's get started.
What tools are we going to use?
- Django: A high-level Python web framework that encourages rapid development and clean, pragmatic design.
- Django Channels: Extends Django to handle WebSockets, chat protocols, IoT protocols, and more. It's required for real-time communication.
- Django Rest Framework: A powerful and flexible toolkit for building Web APIs in Django. It's used to create RESTful APIs.
- LangChain: A language model API that provides a simple interface to interact with large language models like GPT-3.
- Huey: A lightweight task queue for Python that allows you to offload time-consuming tasks to the background. We will be using to run the LangChain API calls asynchronously.
- Django Rest Framework Auth Token: Token-based authentication for Django Rest Framework. It's used to authenticate users in the API.
Setting up the Django Project
First, like typical protocol, let's install the required packages. I have been using poetry
for managing my python packages. It's been really helpful in managing the dependencies and virtual environment. So first create you project directory and create a pyproject.toml
file with the following content.
[tool.poetry]
name = "LangDjangoChat"
version = "0.1.0"
description = "The backend code for chatbot built using django framework"
authors = ["ananthanandanan <ananthanandanan@gmail.com>"]
readme = "README.md"
package-mode = false
[tool.poetry.dependencies]
python = "^3.12"
Django = "^5.0.6"
djangorestframework = "^3.15.1"
django-filter = "^24.2"
django-cors-headers = "^4.3.1"
redis = "^5.0.6"
channels = {extras = ["daphne"], version ="^4.1.0"}
channels-redis = "^4.2.0"
huey = "^2.5.1"
gevent = "^24.2.1"
pyjwt = "^2.8.0"
environ = "^1.0"
[tool.poetry.group.dev.dependencies]
ruff = "^0.4.8"
[tool.poetry.group.llm.dependencies]
langchain = "^0.2.1"
langchain-openai = "^0.1.8"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Now, let's install the packages using the following command.
poetry install
Once that is done, a new virtual environment will be created and the packages will be installed. Now, let's create a new Django project using the following command.
poetry run django-admin startproject framework
This will create a new Django project named framework
, cd
into the framework
directory. Now, let's create a new Django app named chat
using the following command.
poetry run python manage.py startapp chat
This will create a new Django app named chat
. Now, let's add the required packages to the INSTALLED_APPS
in the settings.py
file.
INSTALLED_APPS = [
## websocket modules
"daphne",
"channels",
"corsheaders", ## for cross origin requests
...
## installed app modules
'chat',
## rest framework modules
'rest_framework',
'rest_framework.authtoken',
## huey modules
'huey.contrib.djhuey',
]
Note: Make sure to add the daphne
and channels
packages to the INSTALLED_APPS
at the top of the list. This is required for Django Channels to work properly.
Now, let's gets some basics out of the way.
What is Django Channels?
Django Channels is a project that extends Django to handle WebSockets, chat protocols, IoT protocols, and more. It allows Django to handle more than just plain HTTP requests, including WebSockets and HTTP2. Django Channels is required for real-time communication in Django.
Actually, right now I want all of you guys to take a break and go through the Django Channels documentation. It's really helpful and you will get a good understanding of how Django Channels works. They have a great tutorial that you can follow along. Django Channels Documentation
This is what I did when I started working with Django Channels. It's really helpful and you will get a good understanding of how Django Channels works. But don't worry, I will be explaining the important parts of Django Channels in this blog as well.
Okay, I am guessing you are back from the documentation. Let's me condense the information for you.
How Django Channels Works?
Just like how Django handles HTTP requests, Django Channels handles WebSockets and other protocols. It uses a combination of ASGI (Asynchronous Server Gateway Interface) and WebSockets to handle real-time communication. ASGI is a standard interface between web servers and Python applications that allows for asynchronous communication.
What are Consumers?
In Django Channels, consumers are Python classes that handle WebSockets and other protocols. Consumers are similar to views in Django, but they handle WebSockets and other protocols instead of HTTP requests. Consumers can receive messages, send messages, and perform other actions based on the incoming messages. They are the heart of Django Channels and are responsible for handling real-time communication. But unlike views, consumers are long-running and asynchronous.
What are Channels?
In Django Channels, channels are communication channels that connect clients to consumers. Channels are used to send messages between clients and consumers. Each channel has a unique name and is used to route messages to the appropriate consumer. Channels are the backbone of Django Channels and are responsible for handling real-time communication.
So basically a user connects to a channel and the consumer listens to the channel for any incoming messages. When a message is received, the consumer processes the message and sends a response back to the user. This is how real-time communication works in Django Channels.
But what if we want message to be send to multiple users?
Channel Layers
To solve this situation, Django Channels provides a feature called channel layers. Channel layers are a way to group channels together and send messages between them. Channel layers allow you to send messages to multiple consumers at the same time. You can have multiple channel layers in Django Channels, each with its own set of channels. Channel layers are used to broadcast messages to multiple consumers and handle complex real-time communication scenarios.
Now that we have a basic understanding of Django Channels, let's move on to the next step.
Setting up Django Channels
If you have followed the Django Channels documentation, you would have seen that setting up Django Channels is pretty straightforward. You just need to add the required packages to the INSTALLED_APPS
in the settings.py
file and configure the routing in the routing.py
file. But I will walk you through the process. Since we have already added the required packages to the INSTALLED_APPS
, let's move on to the next step.
First, let's setup the asgi.py
file. This file is used to configure the ASGI application for Django Channels.
"""
ASGI config for framework project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/
"""
import os
from channels.routing import ProtocolTypeRouter
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
# Just HTTP for now. (We can add other protocols later.)
}
)
In the above, asgi.py
file, we are configuring the ASGI application for Django Channels. We are using the ProtocolTypeRouter
to route HTTP requests to the Django ASGI application. This is the basic setup for Django Channels. We will be adding more configuration to the asgi.py
file in the upcoming sections.
Next, set the ASGI_APPLICATION
in the settings.py
file.
ASGI_APPLICATION = "framework.asgi.application"
Adding the chat path to the urls
Now, let's add the chat path to the urls.py
file in the framework
directory.
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("chat/", include("chat.urls")),
]
Once, that is done. If you run the Django server using poetry run python manage.py runserver
, you would be able to see ASGI/Daphne server running on the console.
Setting up Consumers and Routing
Now, let's create a new file named consumers.py
in the chat
app directory. This file will contain the consumers for the chatbot. Consumers are Python classes that handle WebSockets and other protocols in Django Channels.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, close_code):
pass
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.send(text_data=json.dumps({
'message': message
}))
For a consumer to work, it needs to have the following methods:
connect()
: This method is called when the client connects to the consumer.disconnect()
: This method is called when the client disconnects from the consumer.receive()
: This method is called when the consumer receives a message from the client.send()
: This method is used to send a message to the client.accept()
: This method is used to accept the connection from the client.
In this case, we are using AsyncWebsocketConsumer
which is an asynchronous consumer provided by Django Channels. This allows us to write asynchronous code in the consumer.
Note: The above code is just a skeleton for the consumer. We will be adding more functionality to the consumer in the upcoming sections.
Next, let's create a new file named routing.py
in the chat
app directory. This file will contain the routing configuration for the chatbot.
from django.urls import re_path
from . import consumers
## URL patterns for WebSocket routing.
websocket_urlpatterns = [
re_path(r"ws/chat/(?P<thread_id>[\w-]+)/$", consumers.ChatConsumer.as_asgi()),
]
This file contains the URL patterns for WebSocket routing. In this case, we are using the re_path
function to define a URL pattern for the chatbot. The URL pattern is ws/chat/<thread_id>/
where <thread_id>
is a dynamic parameter that will be passed to the consumer.
Adding the routing configuration to the asgi
For channels to work, we need to add the routing configuration. We can bascially follow the Django Channels documentation to add the routing configuration. So to the asgi.py
file add the following code.
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()
from chat.routing import websocket_urlpatterns
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
),
}
)
This code configures the ASGI application for Django Channels. It uses the ProtocolTypeRouter
to route HTTP and WebSocket requests to the appropriate handlers. The AllowedHostsOriginValidator
is used to validate the origin of WebSocket requests. The AuthMiddlewareStack
is used to add authentication middleware to the WebSocket requests. The URLRouter
is used to route WebSocket requests to the appropriate consumers.
From the Django Channels tutorial, if you still have the template files, you can use them to test the websockets.
Setting up Django Rest Framework
Now that we have setup the basic Django Channels, for a while let's switch to setting up the basic models
, serializers
and views
for the Django Rest Framework. i.e. the REST API for the chatbot.
Chat Models
First, let's create a new file named models.py
in the chat
app directory. This file will contain the models for the chatbot.
from django.db import models
import uuid
from django.conf import settings
# Create your models here.
class ChatThread(models.Model):
id = models.UUIDField(primary_key=True, editable=False, default=uuid.uuid4)
name = models.TextField()
author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
organization = models.TextField()
class Meta:
indexes = [
models.Index(fields=["author", "organization"]),
]
def __str__(self):
return self.name
class ChatMessage(models.Model):
SENDER_TYPES = [
("U", "User"),
("B", "Bot"),
]
id = models.UUIDField(primary_key=True, editable=False, default=uuid.uuid4)
thread = models.ForeignKey(ChatThread, on_delete=models.CASCADE)
content = models.TextField()
sender = models.CharField(max_length=1, choices=SENDER_TYPES, default="U")
timestamp = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.content
This file contains the models for the chatbot. We have two models: ChatThread
and ChatMessage
. The ChatThread
model represents a chat thread and has fields for the name, author, and organization. The ChatMessage
model represents a chat message and has fields for the thread, content, sender, and timestamp.
Register the models in the admin.py
file in the chat
app directory.
from django.contrib import admin
from .models import ChatThread, ChatMessage
admin.site.register(ChatThread)
admin.site.register(ChatMessage)
Note: The
ChatThread
model has a foreign key to theMembers
model, which represents the members of the chat thread. This allows us to associate users with chat threads. Let's create the members model as well.
Members Model
I typically would like to create a new django app for the members
model. So let's create a new app named members
using the following command.
poetry run python manage.py startapp members
After following the typical protocol of adding the app to the INSTALLED_APPS
in the settings.py
file, let's create the models.py
file in the members
app directory.
from django.contrib.auth.models import (
AbstractBaseUser,
BaseUserManager,
PermissionsMixin,
)
from django.db import models
import uuid
class MemberManager(BaseUserManager):
def create_user(self, email, username, organization, password=None, **extra_fields):
if not email:
raise ValueError("The Email field must be set")
email = self.normalize_email(email)
user = self.model(
email=email, username=username, organization=organization, **extra_fields
)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(
self, email, username, organization, password=None, **extra_fields
):
extra_fields.setdefault("is_staff", True)
extra_fields.setdefault("is_superuser", True)
if extra_fields.get("is_staff") is not True:
raise ValueError("Superuser must have is_staff=True.")
if extra_fields.get("is_superuser") is not True:
raise ValueError("Superuser must have is_superuser=True.")
return self.create_user(email, username, organization, password, **extra_fields)
class Members(AbstractBaseUser, PermissionsMixin):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
email = models.EmailField(unique=True)
username = models.CharField(max_length=150, unique=True)
organization = models.CharField(max_length=255)
is_active = models.BooleanField(default=True)
is_staff = models.BooleanField(default=False)
date_joined = models.DateTimeField(auto_now_add=True)
objects = MemberManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = ["username", "organization"]
class Meta:
verbose_name = "Member"
verbose_name_plural = "Members"
def __str__(self):
return self.email
This file contains the Members
model which represents the members of the chatbot. The Members
model is a custom user model that extends the AbstractBaseUser
and PermissionsMixin
models provided by Django. The Members
model has fields for the email, username, organization, is_active, is_staff, and date_joined.
Don't forget to register the models in the admin.py
file in the members
app directory.
from django.contrib import admin
from .models import Members
admin.site.register(Members)
Don't forget to add the members to AUTH_USER_MODEL
in the settings.py
file.
AUTH_USER_MODEL = "members.Members"
Chat Serializers
Now, let's create the serializers for the models. Create a new file named serializers.py
in the chat
app directory.
from rest_framework import serializers
from .models import ChatMessage, ChatThread
class ChatThreadSerializer(serializers.ModelSerializer):
class Meta:
model = ChatThread
fields = ["id", "name", "author", "organization"]
class ChatMessageSerializer(serializers.ModelSerializer):
class Meta:
model = ChatMessage
fields = ["id", "thread", "content", "sender", "timestamp"]
This file contains the serializers for the models. The ChatThreadSerializer
and ChatMessageSerializer
classes are used to serialize the ChatThread
and ChatMessage
models, respectively.
Chat views and Authentication
Next, step is something I really wanted to setup, which was the Authentication for the REST API. I wanted to use the Django Rest Framework Auth Token for authentication. So that is why I added the rest_framework.authtoken
to the INSTALLED_APPS
in the settings.py
file. We need to also add the authentication classes to the REST_FRAMEWORK
settings in the settings.py
file.
## Rest framework settings
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": [
"rest_framework.authentication.TokenAuthentication",
],
"DEFAULT_PERMISSION_CLASSES": [
"rest_framework.permissions.IsAuthenticated",
],
}
This code configures the Django Rest Framework to use the TokenAuthentication
class for authentication. The TokenAuthentication
class uses token-based authentication to authenticate users in the API. The IsAuthenticated
permission class is used to ensure that only authenticated users can access the API.
Now, let's create the views for the chatbot. Create a new file named views.py
in the chat
app directory.
from .models import ChatThread, ChatMessage
from .serializers import ChatThreadSerializer, ChatMessageSerializer
from rest_framework.permissions import IsAuthenticated
class ChatThreadViewSet(viewsets.ModelViewSet):
serializer_class = ChatThreadSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return ChatThread.objects.filter(author=self.request.user)
class ChatMessageViewSet(viewsets.ModelViewSet):
serializer_class = ChatMessageSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return ChatMessage.objects.filter(thread=self.kwargs["thread_id"])
This file contains the views for the chatbot. The ChatThreadViewSet
and ChatMessageViewSet
classes are used to create, retrieve, update, and delete chat threads and chat messages, respectively. The IsAuthenticated
permission class is used to ensure that only authenticated users can access the views.
Chat URLs
Now, let's create the secure URLs for the chatbot. Create a new file named urls.py
in the chat
app directory.
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
from rest_framework.authtoken.views import obtain_auth_token
router = DefaultRouter(trailing_slash=False)
(
router.register(
"threads/(?P<thread_id>[^/.]+)/messages",
views.ChatMessageViewSet,
basename="chatmessage",
),
)
(router.register("threads", views.ChatThreadViewSet, basename="chatthread"),)
urlpatterns = [
path("", include(router.urls)),
path("api-token-auth/", obtain_auth_token, name="api_token_auth"),
]
This file contains the secure URLs for the chatbot. The router
is used to register the views and create the URLs for the chat threads and chat messages. The obtain_auth_token
view is used to obtain an authentication token for the user.
If you were able to follow along, you would have setup the basic REST API for the chatbot. Now, let's move on to the next step.
This is where I spent the most time racking my head, which to create Custom Auth Stack for Django Channels using the DRF TokenAuthentication
, but after reading through different posts in both DRF and Django Channels I setup a decent solution.
Setting up Custom Auth Stack for Django Channels
Remember that we are creating an authenticated chatbot, which means that we would need to authenticate and get the user details from the Django Rest Framework. So let's create a custom authentication middleware for Django Channels.
Create a new file named middleware.py
in the chat
app directory.
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
from channels.auth import AuthMiddlewareStack
@database_sync_to_async
def get_user(token_key):
"""Get the user associated with the given token key.
Args:
token_key (str): The key of the token to retrieve.
Returns:
User: The user associated with the token, or an instance of
`AnonymousUser` if the token is invalid.
"""
try:
token = Token.objects.get(key=token_key)
return token.user
except Token.DoesNotExist:
return AnonymousUser()
class TokenAuthMiddleware(BaseMiddleware):
"""Custom token authentication middleware.
This middleware checks for a token in the query string of the request and
attempts to authenticate the user with the token. If the token is invalid,
the user will be set to an instance of `AnonymousUser`.
NOTE: This middleware should be placed before `AuthMiddlewareStack`. This
is because `AuthMiddlewareStack` will attempt to authenticate the user
based on the session data, which is not available for WebSocket
connections. By placing this middleware first, we can ensure that the user
is authenticated based on the token before `AuthMiddlewareStack` is
invoked.
Reference: https://www.django-rest-framework.org/api-guide/authentication/#tokenauthentication
"""
async def __call__(self, scope, receive, send):
try:
token_key = dict(
(x.split("=") for x in scope["query_string"].decode().split("&"))
).get("token", None)
except ValueError:
token_key = None
scope["user"] = (
AnonymousUser() if token_key is None else await get_user(token_key)
)
return await super().__call__(scope, receive, send)
def TokenAuthMiddlewareStack(inner):
return TokenAuthMiddleware(AuthMiddlewareStack(inner))
So what the hell is happening here? The TokenAuthMiddleware
class is a custom token authentication middleware for Django Channels. This middleware checks for a token in the query string of the request and attempts to authenticate the user with the token. If the token is invalid, the user will be set to an instance of AnonymousUser
.
The get_user
function is a helper function that retrieves the user associated with the given token key. If the token is valid, the user associated with the token is returned. If the token is invalid, an instance of AnonymousUser
is returned.
The TokenAuthMiddlewareStack
function is a helper function that wraps the TokenAuthMiddleware
middleware with the AuthMiddlewareStack
middleware. This ensures that the user is authenticated based on the token before the AuthMiddlewareStack
middleware is invoked.
Now, let's add the custom authentication middleware to the ASGI application in the asgi.py
file.
"""
ASGI config for framework project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from chat.middleware import TokenAuthMiddlewareStack
from chat.routing import websocket_urlpatterns
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AllowedHostsOriginValidator(
TokenAuthMiddlewareStack(URLRouter(websocket_urlpatterns))
),
}
)
Basically instead of using the AuthMiddlewareStack
we are using the TokenAuthMiddlewareStack
which we created in the middleware.py
file. What this means that the user will be authenticated based on the token before the AuthMiddlewareStack
is invoked. And if you know about Channels, there are three attributes -> scope
, receive
, send
. The scope
is a dictionary that contains information about the connection, such as the path, query string, and headers. The receive
function is used to receive messages from the client. The send
function is used to send messages to the client.
Now, it's time to really shape up the chatbot.
Setting up LangChain
Before we move on to implementing the ChatConsumer
, let's setup the LangChain API. LangChain is a language model API that provides a simple interface to interact with large language models like GPT-4. I have been using LangChain for a while now and it's been really helpful in building chatbots
We had already installed the langchain
and langchain-openai
packages in the initial setup. Now, let's create a directory under chat called chatbot
and create a new file named app.py
.
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai.chat_models import AzureChatOpenAI
import os
os.environ["OPEN_API_TYPE"] = "azure"
os.environ["AZURE_OPENAI_API_KEY"] = "YOUR_AZURE_OPENAI_API_KEY"
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://<key>.openai.azure.com"
os.environ["OPENAI_API_VERSION"] = "2023-05-15"
llm = AzureChatOpenAI(
deployment_name="gpt-35-turbo",
model_name="gpt-35-turbo",
temperature=0,
streaming=True,
)
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a AI Chatbot."),
("user", "{input}"),
]
)
chain = prompt | llm
This file contains the LangChain setup for the chatbot. The AzureChatOpenAI
class is used to interact with the Azure OpenAI API. The llm
object is created with the deployment name, model name, temperature, and streaming parameters. The prompt
object is created with the chat prompt template. The chain
object is created by combining the prompt and the llm object.
Running LangChain as a background task
To make the experience seamless, we need to run the LangChain API calls asynchronously. For that we will be using the Huey
package. Huey is a lightweight task queue for Python that allows you to offload time-consuming tasks to the background.
Again, since we had already installed the huey
package in the initial setup, let's begin by setting the HUEY
settings in the settings.py
file.
## Huey settings
HUEY = {
"huey_class": "huey.RedisHuey",
"name": "framework",
"connection": {"host": "localhost", "port": 6379, "db": 1},
"immediate": False,
"immediate_use_memory": False,
"consumer": {
"workers": 4,
"worker_type": "greenlet",
"loglevel": logging.DEBUG,
},
}
Setup Gevent for Huey worker
Since we are using greenlet
as the worker type, we need to setup Gevent
as well. Gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop. For Django we need to do a monkey patching to make it work. In the manage.py
file add the following code.
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
# Apply monkey-patch if we are running the huey consumer.
if "run_huey" in sys.argv:
from gevent import monkey
monkey.patch_all()
def main():
"""Run administrative tasks."""
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "framework.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == "__main__":
main()
Before we setup the tasks.py
let's completely setup the ChatConsumer
.
Channel Layer setup
The channel layer is used to group channels together and send messages between them. Which basically means that, it's using the channel layers are the additional layer of abstraction that allows us to send messages between different channels.
For that we need to setup CHANNEL_LAYERS
in the settings.py
file.
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
},
},
}
Note: Make sure to install Redis and run the Redis server before running the Django server. For my ease of use, I ran the Redis server using Docker.
Setting up the ChatConsumer
Before, I show you the ChatConsumer
, let me explain my thought process. So we have the ChatThread
and ChatMessage
models. The user flow would be:
- The user signs up and logs in. Or If the user is already registered, the user logs in.
- The user creates a new chat thread, or joins an existing chat thread.
- The user sends a message to the chat thread.
- The LLM model processes the message and sends a response back to the user.
- These messages are stored in the ChatMessage model and the thread_id is also stored.
- In addition, since the data is stored, the user can view the chat history.
With that in mind, let's update the ChatConsumer
.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from .models import ChatThread, ChatMessage
from members.models import Members
from .tasks import handle_chat_message
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.thread_id = self.scope["url_route"]["kwargs"]["thread_id"]
self.user = self.scope["user"]
if not self.user.is_authenticated:
await self.close()
return
self.thread = await self.get_or_create_thread(self.thread_id, self.user)
await self.channel_layer.group_add(self.thread_id, self.channel_name)
await self.accept()
await self.send_chat_logs()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.thread_id, self.channel_name)
async def receive(self, text_data=None, bytes_data=None):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
chat_message = await self.save_message(self.thread, self.user, message)
await self.channel_layer.group_send(
self.thread_id,
{
"type": "chat.message",
"message": chat_message.content,
"sender": chat_message.sender,
"timestamp": chat_message.timestamp.isoformat(),
"category": "user_message",
},
)
handle_chat_message(self.thread_id, message)
async def chat_message(self, event):
category = event.get("category")
timestamp = event.get("timestamp")
stream_id = event.get("stream_id")
chunk = event.get("chunk")
response = event.get("response")
message = event.get("message")
sender = event.get("sender", "Bot") # Default sender to "Bot"
if category == "stream_start":
await self.send(
text_data=json.dumps(
{
"message": "",
"sender": sender,
"timestamp": timestamp,
"stream_id": stream_id,
"category": "stream_start",
}
)
)
elif category == "stream_chunk":
await self.send(
text_data=json.dumps(
{
"message": chunk,
"sender": sender,
"timestamp": timestamp,
"stream_id": stream_id,
"category": "stream_chunk",
}
)
)
elif category == "stream_end":
_ = await self.save_message(self.thread, "B", response)
await self.send(
text_data=json.dumps(
{
"message": response,
"sender": sender,
"timestamp": timestamp,
"stream_id": stream_id,
"category": "stream_end",
}
)
)
else:
await self.send(
text_data=json.dumps(
{
"message": message,
"sender": sender,
"timestamp": timestamp,
"category": category,
}
)
)
async def send_chat_logs(self):
messages = await self.get_chat_messages()
for message in messages:
await self.send(
text_data=json.dumps(
{
"message": message.content,
"sender": message.sender,
"timestamp": message.timestamp.isoformat(),
"category": "chat_log",
}
)
)
@database_sync_to_async
def get_or_create_thread(self, thread_id, user):
user = Members.objects.get(email=user.email)
thread, created = ChatThread.objects.get_or_create(
id=thread_id,
defaults={
"name": f"Chat {thread_id}",
"author": user,
"organization": user.organization,
},
)
return thread
@database_sync_to_async
def save_message(self, thread, user, content):
if isinstance(user, Members):
chat_message = ChatMessage.objects.create(
thread=thread, content=content, sender="U"
)
else:
chat_message = ChatMessage.objects.create(
thread=thread, content=content, sender="B"
)
return chat_message
@database_sync_to_async
def get_chat_messages(self):
return list(
ChatMessage.objects.filter(thread=self.thread).order_by("timestamp")
)
What is happening in the ChatConsumer
?
The ChatConsumer
class extends the AsyncWebsocketConsumer
class provided by Django Channels. The ChatConsumer
class is responsible for handling WebSocket connections and messages for the chatbot. Let's break down the methods in the ChatConsumer
class:
-
The
connect
method is called when the client connects to the consumer. In this method, we get the thread_id and user from the scope, check if the user is authenticated, get or create the chat thread, add the channel to the group, and accept the connection. We also send the chat logs to the user. -
The
disconnect
method is called when the client disconnects from the consumer. In this method, we remove the channel from the group. -
The
receive
method is called when the consumer receives a message from the client. In this method, we save the message to the chat thread, send the message to the group, and handle the message using thehandle_chat_message
task. I will elaborate on this task in the next section. -
The
chat_message
method is called when the consumer receives a message from the group. In this method, we send the message to the user. -
The
send_chat_logs
method is used to send the chat logs to the user. -
The
get_or_create_thread
method is used to get or create the chat thread. -
The
save_message
method is used to save the message to the chat thread. -
The
get_chat_messages
method is used to get the chat messages from the chat thread.
Note: The decorator
@database_sync_to_async
is used to convert synchronous database operations to asynchronous operations. This allows us to perform database operations asynchronously in the consumer. A event hanlder method likechat_message
is called using thetype
key in thegroup_send
method. Thecategory
key is used to determine the type of message. I will explain how this is implemented in the frontend in the next section.
Setting up the Huey Task
Now, let's create the tasks.py
file in the chat
app directory.
from huey.contrib.djhuey import task
import logging
import uuid
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from chat.chatbot import app as chat_app
from django.utils import timezone
import logging
async def stream_response(thread_id, query):
runnable = chat_app.chain
stream_id = str(uuid.uuid4())
channel_layer = get_channel_layer()
def send(message):
return channel_layer.group_send(thread_id, message)
await send(
{
"type": "chat.message",
"category": "stream_start",
"stream_id": stream_id,
"timestamp": timezone.now().isoformat(), # Adding a unique timestamp
}
)
response = ""
async for chunk in runnable.astream({"input": query}):
response += chunk.content
logging.warning(f"Stream chunk: {chunk.content}")
await send(
{
"type": "chat.message",
"category": "stream_chunk",
"stream_id": stream_id,
"chunk": chunk.content,
"timestamp": timezone.now().isoformat(), # Adding a unique timestamp
}
)
logging.warning(f"Stream response: {response}")
await send(
{
"type": "chat.message",
"category": "stream_end",
"stream_id": stream_id,
"response": response,
"timestamp": timezone.now().isoformat(), # Adding a unique timestamp
}
)
@task()
def handle_chat_message(thread_id, query):
logging.warning(f"Handling chat message {thread_id} {query}")
async_to_sync(stream_response)(thread_id, query)
What is happening in the tasks.py
file?
Here, I am using the category
key to determine the type of message. The stream_start
category is used to indicate the start of the stream. The stream_chunk
category is used to indicate a chunk of the stream. The stream_end
category is used to indicate the end of the stream. The response
key is used to store the response from the LLM model.
The stream_response
function is used to stream the response from the LLM model to the user. The runnable
object is created with the LangChain chain. The stream_id
is generated with a unique identifier. The channel_layer
is used to send messages to the group. The send
function is used to send messages to the group. The stream_start
message is sent to the group to indicate the start of the stream. The response is accumulated in the response
variable. The stream_chunk
message is sent to the group to indicate a chunk of the stream. The stream_end
message is sent to the group to indicate the end of the stream.
Note: The
@task()
decorator is used to create a Huey task. Thehandle_chat_message
task is used to handle the chat message. Thestream_response
function is called asynchronously in the task.The
async_to_sync
function is used to convert an asynchronous function to a synchronous function. This allows us to call asynchronous functions in synchronous code. We require this because thestream_response
function is an asynchronous function, but thehandle_chat_message
task is a synchronous function.the
get_channel_layer
function is used to get the channel layer. Thegroup_send
method is used to send messages to the group.
Bring together all the pieces into "View".
Let's write the logic, that will bring together all the pieces we have setup so far. In the views.py
file in the chat
app directory, add the following code.
Creating members creation form
In django, we can create a form using the ModelForm
class. Let's create a form for the Members
model. Create a new file named forms.py
in the members
app directory.
from django import forms
from django.contrib.auth.forms import UserCreationForm
from .models import Members
class MembersCreationForm(UserCreationForm):
class Meta:
model = Members
fields = ("username", "email", "organization", "password1", "password2")
This file contains the MembersCreationForm
class, which is a form for creating new members. The form is based on the UserCreationForm
class provided by Django. The form has fields for the username, email, organization, and password.
Register and Login Views
from django.views import View
from members.forms import MembersCreationForm
from rest_framework.authtoken.models import Token
## TODO: Convert the register and login views to use Django REST Framework API views for more production-ready code
class RegisterView(View):
def get(self, request):
form = MembersCreationForm()
return render(request, "chat/register.html", {"form": form})
def post(self, request):
form = MembersCreationForm(request.POST)
if form.is_valid():
user = form.save()
token, created = Token.objects.get_or_create(user=user)
login(request, user)
return redirect("chatroom")
return render(request, "chat/register.html", {"form": form})
class LoginView(View):
def get(self, request):
return render(request, "chat/login.html")
def post(self, request):
email = request.POST["email"]
password = request.POST["password"]
user = authenticate(request, email=email, password=password)
if user is not None:
login(request, user)
token, created = Token.objects.get_or_create(user=user)
return redirect("chatroom")
return render(request, "chat/login.html", {"error": "Invalid credentials"})
def chat(request):
token, created = Token.objects.get_or_create(user=request.user)
return render(request, "chat/chat.html", {"token": token.key})
This file contains the views for the chatbot. The RegisterView
and LoginView
classes are used to register and login users, respectively. The chat
function is used to render the chat room template with the authentication token.
Update the urls.py
file in the chat
app directory with the following code.
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from rest_framework.authtoken.views import obtain_auth_token
from . import views
router = DefaultRouter(trailing_slash=False)
(
router.register(
"threads/(?P<thread_id>[^/.]+)/messages",
views.ChatMessageViewSet,
basename="chatmessage",
),
)
(router.register("threads", views.ChatThreadViewSet, basename="chatthread"),)
urlpatterns = [
path("", include(router.urls)),
## NOTE: API endpoint to get the auth token
path("api-token-auth/", obtain_auth_token, name="api_token_auth"),
path("register/", views.RegisterView.as_view(), name="register"),
path("login/", views.LoginView.as_view(), name="login"),
path("chatroom/", views.chat, name="chatroom"),
]
Setting up the Frontend
Now, let's setup the frontend for the chatbot. Create a new directory named templates
in the chat
app directory. In the templates
directory, create a new directory named chat
. In the chat
directory, create a new file named login.html
, register.html
, and chat.html
.
Login page
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
<h2>Login</h2>
<form method="post">
{% csrf_token %}
<input type="email" name="email" placeholder="Email" required />
<input type="password" name="password" placeholder="Password" required />
<button type="submit">Login</button>
</form>
{% if error %}
<p style="color: red">{{ error }}</p>
{% endif %}
</body>
</html>
Register page
<!DOCTYPE html>
<html>
<head>
<title>Register</title>
</head>
<body>
<h2>Register</h2>
<form method="post">
{% csrf_token %} {{ form.as_p }}
<button type="submit">Register</button>
</form>
</body>
</html>
Chat page
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h2>Chat Room</h2>
<div id="thread-section">
<input type="text" id="thread-id" placeholder="Thread ID" />
<button onclick="joinThread()">Join Thread</button>
<button onclick="createThread()">Create Thread</button>
</div>
<div id="chat-section" style="display: none">
<div id="messages" style="border: 1px solid #ccc; height: 300px; overflow-y: scroll"></div>
<input type="text" id="message-input" placeholder="Type a message" />
<button onclick="sendMessage()">Send</button>
</div>
<script>
let threadId
let socket
const token = '{{ token }}' // Get the token from the context
const ongoingMessages = {} // Object to keep track of ongoing messages by stream_id
function joinThread() {
threadId = document.getElementById('thread-id').value
if (threadId) {
document.getElementById('thread-section').style.display = 'none'
document.getElementById('chat-section').style.display = 'block'
connectWebSocket()
}
}
function createThread() {
threadId = document.getElementById('thread-id').value || generateUUID() // Use input value or generate a new unique thread ID
document.getElementById('thread-section').style.display = 'none'
document.getElementById('chat-section').style.display = 'block'
connectWebSocket()
}
function connectWebSocket() {
socket = new WebSocket(`ws://${window.location.host}/ws/chat/${threadId}/?token=${token}`)
socket.onmessage = function (event) {
const data = JSON.parse(event.data)
console.log('Received data:', data) // Debugging line
const messages = document.getElementById('messages')
// If this is a user message, display it immediately
if (data.category === 'user_message') {
const message = document.createElement('div')
message.textContent = `${data.timestamp} - ${data.sender}: ${data.message}`
messages.appendChild(message)
}
// Handle chat log messages
if (data.category === 'chat_log') {
const message = document.createElement('div')
message.textContent = `${data.timestamp} - ${data.sender}: ${data.message}`
messages.appendChild(message)
}
// If this is the start of a new stream, create a new message element
if (data.category === 'stream_start') {
ongoingMessages[data.stream_id] = document.createElement('div')
ongoingMessages[data.stream_id].textContent = `${data.timestamp} - ${data.sender}: `
messages.appendChild(ongoingMessages[data.stream_id])
}
// Append chunks to the ongoing message
if (data.category === 'stream_chunk') {
ongoingMessages[data.stream_id].textContent += data.message
}
// Finalize the message when the stream ends
if (data.category === 'stream_end') {
delete ongoingMessages[data.stream_id]
}
messages.scrollTop = messages.scrollHeight // Scroll to bottom
}
socket.onclose = function (event) {
console.error('WebSocket closed unexpectedly')
}
}
function sendMessage() {
const input = document.getElementById('message-input')
const message = input.value
socket.send(JSON.stringify({ message: message }))
input.value = ''
}
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
const r = (Math.random() * 16) | 0,
v = c === 'x' ? r : (r & 0x3) | 0x8
return v.toString(16)
})
}
</script>
</body>
</html>
What is happening in the chat.html
file?
This might look like a lot of code, but it's actually quite simple. The chat.html
file contains the HTML and JavaScript code for the chat room. The thread-section
div contains input fields for the thread ID and buttons to join or create a thread. The chat-section
div contains the chat messages, input field for the message, and button to send the message.
The joinThread
function is called when the user clicks the join thread button. In this function, we get the thread ID from the input field, hide the thread section, show the chat section, and connect to the WebSocket.
The createThread
function is called when the user clicks the create thread button. In this function, we get the thread ID from the input field or generate a new unique thread ID, hide the thread section, show the chat section, and connect to the WebSocket.
The connectWebSocket
function is used to connect to the WebSocket. In this function, we create a new WebSocket connection with the thread ID and token. We handle the onmessage
and onclose
events of the WebSocket.
The implementation that make it all lies in the onmessage
event. The onmessage
event is called when the WebSocket receives a message from the server. In this event, we parse the message data, log the data to the console, and display the message in the chat room. We handle different categories of messages, such as user messages, chat log messages, stream start messages, stream chunk messages, and stream end messages. This is where the magic happens. We are handling the stream of messages and appending based on the stream_id
and category
of the message in such a way that it would feel like a real chatbot.
The sendMessage
function is called when the user clicks the send button. In this function, we get the message from the input field, send the message to the server using the WebSocket, and clear the input field.
The generateUUID
function is used to generate a new unique thread ID. This was a function took from ChatGPT LOL.
How to run the Chatbot
Now, let the fun begin. Run the Django server using the following command.
poetry run python manage.py runserver
Then on a new terminal, run the Huey consumer using the following command.
poetry run python manage.py run_huey
Now, open your browser and go to http://localhost:8000/register/
. Register a new user and login. Then go to http://localhost:8000/chatroom/
. Enter a thread ID and click the create thread button. You could see an empty chat room. Now, open another browser tab and go to http://localhost:8000/chatroom/
. Enter the same thread ID and click the join thread button. You could see the chat room in both tabs. Now, you can send messages in one tab and see the responses in the other tab. When you rejoin the chat room, you could see the chat history. And if you are not authenticated, you would be redirected to the login page.
Wow, now we have a chatbot that can interact with users in real-time. This is a basic implementation of a chatbot using Django Channels, Django Rest Framework, LangChain, and Huey. You can extend this chatbot by adding more features. The frontend here is just html and javascript, you can use any frontend framework like React, Vue, or Angular to build a more interactive chatbot. The steps to write the web socket in the frontend doesn't change much, you just need to connect to the websocket and handle the messages.
Conclusion
In this walkthrough, we covered quite a lot of ground. We started by setting up the Django project and creating the models, serializers, views, and URLs for the chatbot. We then created a custom authentication middleware for Django Channels and set up the channel layer for the chatbot. We also set up the LangChain API and Huey task for the chatbot. Finally, we created the frontend for the chatbot and brought together all the pieces to build a real-time chatbot.
I hope you found this walkthrough helpful and that you learned something new.