实时通信已成为现代 Web 应用程序的基石。无论是构建客户支持聊天、协作工具还是社交平台,WebSocket 都能提供即时更新所需的双向通信。在本文中,我们将探讨如何使用 FastAPI 的 WebSocket 功能构建一个强大的实时聊天应用程序,同时利用 Django 进行数据持久化和业务逻辑处理。

为什么选择 FastAPI + Django?
你可能会想,为什么我们要合并两个框架,而不是只使用一个。答案在于它们的优势互补:
- FastAPI擅长以最小的开销、异步操作和高性能处理 WebSocket 连接
- Django提供了成熟的 ORM、身份验证系统和管理界面来管理应用程序的数据
这种混合方法可让您为每项工作使用最佳工具:FastAPI 用于实时连接,Django 用于其他工作。
架构概述
本聊天应用程序有三个主要组件:
- Django 后端:处理用户身份验证、消息存储和 REST API 端点
- FastAPI WebSocket 服务器:管理实时连接和消息广播
- 前端客户端:同时连接两个服务,实现无缝用户体验
设置 Django
首先,创建一个包含聊天功能所需模型的 Django 项目。
安装依赖项
pip install django djangorestframework django-cors-headers psycopg2-binary
创建聊天模型
# chat/models.py
from django.db import models
from django.contrib.auth.models import User
class ChatRoom(models.Model):
name = models.CharField(max_length=255)
slug = models.SlugField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
participants = models.ManyToManyField(User, related_name='chat_rooms')
def __str__(self):
return self.name
class Message(models.Model):
room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
sender = models.ForeignKey(User, on_delete=models.CASCADE)
content = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)
is_read = models.BooleanField(default=False)
class Meta:
ordering = ['timestamp']
def __str__(self):
return f"{self.sender.username}: {self.content[:50]}"
创建 Django REST API 端点
# chat/serializers.py
from rest_framework import serializers
from .models import ChatRoom, Message
from django.contrib.auth.models import User
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email']
class MessageSerializer(serializers.ModelSerializer):
sender = UserSerializer(read_only=True)
class Meta:
model = Message
fields = ['id', 'sender', 'content', 'timestamp', 'is_read']
class ChatRoomSerializer(serializers.ModelSerializer):
participants = UserSerializer(many=True, read_only=True)
last_message = serializers.SerializerMethodField()
class Meta:
model = ChatRoom
fields = ['id', 'name', 'slug', 'created_at', 'participants', 'last_message']
def get_last_message(self, obj):
last_msg = obj.messages.last()
if last_msg:
return MessageSerializer(last_msg).data
return None
# chat/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .models import ChatRoom, Message
from .serializers import ChatRoomSerializer, MessageSerializer
class ChatRoomViewSet(viewsets.ModelViewSet):
queryset = ChatRoom.objects.all()
serializer_class = ChatRoomSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'slug'
def get_queryset(self):
return self.request.user.chat_rooms.all()
@action(detail=True, methods=['get'])
def messages(self, request, slug=None):
room = self.get_object()
messages = room.messages.all()
serializer = MessageSerializer(messages, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def join(self, request, slug=None):
room = self.get_object()
room.participants.add(request.user)
return Response({'status': 'joined'})
配置 URL 和设置
# chat/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import ChatRoomViewSet
router = DefaultRouter()
router.register(r'rooms', ChatRoomViewSet)
urlpatterns = [
path('api/', include(router.urls)),
]
更新您的 Django 设置以启用 CORS 并配置数据库:
#settings.py
INSTALLED_APPS = [
#...
'rest_framework' ,
'corsheaders' ,
'chat' ,
]
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
# ...其他中间件
]
CORS_ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://localhost:8001", # FastAPI server
]
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.TokenAuthentication',
],
}
构建 FastAPI WebSocket 服务器
现在开始创建处理 WebSocket 连接的 FastAPI 应用程序。
安装 FastAPI 依赖项
pip install fastapi uvicorn websockets python-multipart httpx
创建 WebSocket 管理器
# websocket_server.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, List
import json
import httpx
from datetime import datetime
app = FastAPI()
# 启用 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 连接管理器处理多个 WebSocket 连接
class ConnectionManager:
def __init__(self):
# 按房间 slug 存储活动连接
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_slug: str):
await websocket.accept()
if room_slug not in self.active_connections:
self.active_connections[room_slug] = []
self.active_connections[room_slug].append(websocket)
def disconnect(self, websocket: WebSocket, room_slug: str):
if room_slug in self.active_connections:
self.active_connections[room_slug].remove(websocket)
if not self.active_connections[room_slug]:
del self.active_connections[room_slug]
async def broadcast(self, room_slug: str, message: dict):
if room_slug in self.active_connections:
disconnected = []
for connection in self.active_connections[room_slug]:
try:
await connection.send_json(message)
except:
disconnected.append(connection)
# 清理断开连接的客户端
for conn in disconnected:
self.disconnect(conn, room_slug)
async def send_personal_message(self, message: dict, websocket: WebSocket):
await websocket.send_json(message)
manager = ConnectionManager()
# Django API 配置
DJANGO_API_URL = "http://localhost:8000"
async def save_message_to_django(room_slug: str, user_id: int, content: str, token: str):
"""Save message to Django database via REST API"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{DJANGO_API_URL}/api/messages/",
json={
"room_slug": room_slug,
"content": content,
},
headers={"Authorization": f"Token {token}"},
timeout=5.0
)
return response.json() if response.status_code == 201 else None
except Exception as e:
print(f"Error saving message: {e}")
return None
@app.websocket("/ws/chat/{room_slug}")
async def websocket_endpoint(
websocket: WebSocket,
room_slug: str,
token: str = None
):
"""
WebSocket endpoint for real-time chat
Query parameter 'token' should contain the user's auth token
"""
if not token:
await websocket.close(code=4001, reason="Authentication required")
return
await manager.connect(websocket, room_slug)
# 发送连接确认
await manager.send_personal_message({
"type": "connection",
"message": "Connected to chat room",
"room": room_slug
}, websocket)
try:
while True:
# 从客户端接收消息
data = await websocket.receive_text()
message_data = json.loads(data)
# 提取消息详细信息
content = message_data.get("content", "")
user_id = message_data.get("user_id")
username = message_data.get("username", "Anonymous")
if not content.strip():
continue
# 保存到 Django 数据库
saved_message = await save_message_to_django(
room_slug, user_id, content, token
)
# 准备广播消息
broadcast_data = {
"type": "message",
"id": saved_message.get("id") if saved_message else None,
"content": content,
"username": username,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"room": room_slug
}
# 向房间内所有客户端广播
await manager.broadcast(room_slug, broadcast_data)
except WebSocketDisconnect:
manager.disconnect(websocket, room_slug)
# 通知其他用户
await manager.broadcast(room_slug, {
"type": "user_left",
"message": f"User left the chat",
"room": room_slug
})
except Exception as e:
print(f"Error: {e}")
manager.disconnect(websocket, room_slug)
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"active_rooms": len(manager.active_connections),
"total_connections": sum(len(conns) for conns in manager.active_connections.values())
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
前端集成
现在创建一个简单的 HTML/JavaScript 客户端来连接到我们的聊天应用程序。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Real-Time Chat</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
height: 100vh;
display: flex;
justify-content: center;
align-items: center;
}
.chat-container {
width: 90%;
max-width: 800px;
height: 600px;
background: white;
border-radius: 12px;
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
display: flex;
flex-direction: column;
}
.chat-header {
background: #667eea;
color: white;
padding: 20px;
border-radius: 12px 12px 0 0;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 20px;
background: #f7f7f7;
}
.message {
margin-bottom: 15px;
padding: 12px;
border-radius: 8px;
max-width: 70%;
}
.message.sent {
background: #667eea;
color: white;
margin-left: auto;
text-align: right;
}
.message.received {
background: white;
border: 1px solid #e0e0e0;
}
.message .username {
font-weight: bold;
font-size: 0.9em;
margin-bottom: 5px;
}
.message .timestamp {
font-size: 0.75em;
opacity: 0.7;
margin-top: 5px;
}
.chat-input {
display: flex;
padding: 20px;
background: white;
border-radius: 0 0 12px 12px;
border-top: 1px solid #e0e0e0;
}
.chat-input input {
flex: 1;
padding: 12px;
border: 1px solid #ddd;
border-radius: 6px;
font-size: 14px;
}
.chat-input button {
margin-left: 10px;
padding: 12px 24px;
background: #667eea;
color: white;
border: none;
border-radius: 6px;
cursor: pointer;
font-weight: bold;
}
.chat-input button:hover {
background: #5568d3;
}
.status {
padding: 8px;
text-align: center;
font-size: 0.85em;
background: #f0f0f0;
}
.status.connected { background: #d4edda; color: #155724; }
.status.disconnected { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="chat-container">
<div class="chat-header">
<h2>Real-Time Chat Room</h2>
<small id="roomName"></small>
</div>
<div class="status" id="status">Connecting...</div>
<div class="chat-messages" id="messages"></div>
<div class="chat-input">
<input type="text" id="messageInput" placeholder="Type a message..." />
<button onclick="sendMessage()">Send</button>
</div>
</div>
<script>
// Configuration
const WEBSOCKET_URL = 'ws://localhost:8001/ws/chat/general';
const AUTH_TOKEN = 'your-auth-token-here'; // Get from Django authentication
const USER_ID = 1; // Current user ID
const USERNAME = 'Demo User'; // Current username
let ws;
let reconnectInterval = 3000;
// Connect to WebSocket
function connect() {
ws = new WebSocket(`${WEBSOCKET_URL}?token=${AUTH_TOKEN}`);
ws.onopen = () => {
console.log('Connected to WebSocket');
updateStatus('connected', 'Connected');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
handleMessage(data);
};
ws.onclose = () => {
console.log('Disconnected from WebSocket');
updateStatus('disconnected', 'Disconnected. Reconnecting...');
setTimeout(connect, reconnectInterval);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
// Handle incoming messages
function handleMessage(data) {
const messagesDiv = document.getElementById('messages');
if (data.type === 'connection') {
console.log('Connection confirmed:', data.message);
return;
}
if (data.type === 'message') {
const messageDiv = document.createElement('div');
messageDiv.className = data.user_id === USER_ID ? 'message sent' : 'message received';
const timestamp = new Date(data.timestamp).toLocaleTimeString();
messageDiv.innerHTML = `
${data.user_id !== USER_ID ? `<div class="username">${data.username}</div>` : ''}
<div class="content">${escapeHtml(data.content)}</div>
<div class="timestamp">${timestamp}</div>
`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
}
// Send message
function sendMessage() {
const input = document.getElementById('messageInput');
const content = input.value.trim();
if (!content || !ws || ws.readyState !== WebSocket.OPEN) {
return;
}
const message = {
content: content,
user_id: USER_ID,
username: USERNAME
};
ws.send(JSON.stringify(message));
input.value = '';
}
// Update connection status
function updateStatus(status, message) {
const statusDiv = document.getElementById('status');
statusDiv.className = `status ${status}`;
statusDiv.textContent = message;
}
// Utility function to escape HTML
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// Allow Enter key to send message
document.getElementById('messageInput').addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendMessage();
}
});
// Initialize connection
connect();
</script>
</body>
</html>
运行应用程序
启动 Django 服务器
# 应用迁移
python manage.py makemigrations
python manage.py migrate
# 创建超级用户
python manage.py createsuperuser
# 运行 Django 服务器
python manage.py runserver 8000
启动 FastAPI WebSocket 服务器
# 运行 FastAPI 服务器
python websocket_server.py
FastAPI 服务器将在端口 8001 上启动,处理所有 WebSocket 连接。
打开前端界面
在浏览器中打开HTML文件,或通过本地开发服务器运行。请确保更新JavaScript代码中的认证令牌。
可考虑的高级功能
1. 输入指示器
添加实时输入指示器,显示用户正在编写消息的状态:
@app.websocket( "/ws/chat/{room_slug}" )
async def websocket_endpoint ( websocket: WebSocket, room_slug: str ):
# ... 现有代码 ...
if message_data.get( "type" ) == "typing" :
await manager.broadcast(room_slug, {
"type" : "typing" ,
"username" : username,
"is_typing" : message_data.get( "is_typing" , False )
})
2. 已读回执
通过更新 Django 模型来跟踪消息的读取时间:
@action(detail=True, methods=['post'])
def mark_read(self, request, pk=None):
message = self.get_object()
message.is_read = True
message.save()
return Response({'status': 'marked as read'})
3. 文件共享
扩展消息模型以支持文件附件:
class Message(models.Model):
# ... 现有字段 ...
attachment = models.FileField(upload_to='chat_files/', null=True, blank=True)
attachment_type = models.CharField(max_length=50, null=True, blank=True)
4. 消息表情
为消息添加表情反应:
class MessageReaction(models.Model):
message = models.ForeignKey(Message, on_delete=models.CASCADE, related_name='reactions')
user = models.ForeignKey(User, on_delete=models.CASCADE)
emoji = models.CharField(max_length=10)
created_at = models.DateTimeField(auto_now_add=True)
性能优化技巧
1. 连接池
对于生产环境,实现数据库操作的连接池:
import asyncpg
class DatabasePool:
def __init__(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
dsn='postgresql://user:password@localhost/chatdb',
min_size=10,
max_size=20
)
async def execute(self, query, *args):
async with self.pool.acquire() as connection:
return await connection.execute(query, *args)
2. 消息缓存
使用 Redis 缓存最新消息并减少数据库负载:
import redis.asyncio as redis
redis_client = redis.from_url("redis://localhost")
async def get_cached_messages(room_slug: str):
cached = await redis_client.get(f"room:{room_slug}:messages")
if cached:
return json.loads(cached)
return None
async def cache_messages(room_slug: str, messages: list):
await redis_client.setex(
f"room:{room_slug}:messages",
300, # Cache for 5 minutes
json.dumps(messages)
)
3. 速率限制
通过速率限制防止垃圾邮件和滥用:
from collections import defaultdict
from time import time
class RateLimiter:
def __init__(self, max_messages=10, time_window=60):
self.max_messages = max_messages
self.time_window = time_window
self.messages = defaultdict(list)
def is_allowed(self, user_id: int) -> bool:
now = time()
user_messages = self.messages[user_id]
# Remove old messages outside time window
user_messages[:] = [msg_time for msg_time in user_messages
if now - msg_time < self.time_window]
if len(user_messages) >= self.max_messages:
return False
user_messages.append(now)
return True
rate_limiter = RateLimiter()
安全注意事项
1. 身份验证和授权
在接受 WebSocket 连接之前始终验证用户令牌:
async def verify_token(token: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{DJANGO_API_URL}/api/auth/verify/",
headers={"Authorization": f"Token {token}"}
)
if response.status_code == 200:
return response.json()
return None
@app.websocket("/ws/chat/{room_slug}")
async def websocket_endpoint(websocket: WebSocket, room_slug: str, token: str):
user_data = await verify_token(token)
if not user_data:
await websocket.close(code=4001, reason="Invalid token")
return
# ... continue with connection
2. 输入清理
始终清理用户输入以防止 XSS 攻击:
import bleach
def sanitize_message(content: str) -> str:
allowed_tags = ['b', 'i', 'u', 'em', 'strong']
return bleach.clean(content, tags=allowed_tags, strip=True)
3. 生产中的 SSL/TLS
对于生产部署,请始终使用安全的 WebSocket 连接(wss://):
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8001,
ssl_keyfile="/path/to/key.pem",
ssl_certfile="/path/to/cert.pem"
)
部署策略
Docker Compose 配置
创建 docker-compose.yml 以便轻松部署:
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: chatdb
POSTGRES_USER: chatuser
POSTGRES_PASSWORD: chatpass
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
django:
build: ./django
command: gunicorn config.wsgi:application --bind 0.0.0.0:8000
volumes:
- ./django:/app
ports:
- "8000:8000"
depends_on:
- postgres
- redis
fastapi:
build: ./fastapi
command: uvicorn websocket_server:app --host 0.0.0.0 --port 8001
volumes:
- ./fastapi:/app
ports:
- "8001:8001"
depends_on:
- django
volumes:
postgres_data:
结论
使用 FastAPI WebSockets 和 Django 构建实时聊天应用程序,可以结合两个框架的优势。FastAPI 负责处理性能至关重要的实时连接,而 Django 则通过强大的 ORM 和管理界面管理应用程序的数据和业务逻辑。
该架构具有良好的扩展性,并可以扩展以下功能:
- 支持多房间用户在线
- 用户之间的直接消息传递
- 消息搜索和历史记录
- 为离线用户推送通知
- 视频和语音通话集成
成功的关键在于保持 WebSocket 层轻量且快速,同时将繁重的操作转移至 Django 的 REST API。通过适当的缓存、速率限制和安全措施,此设置可以高效处理数千个并发连接。
作者:Yogeshkrishnanseeniraj
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/62385.html