基于 FastAPI WebSockets 与 Django 集成的实时聊天应用

实时通信已成为现代 Web 应用程序的基石。无论是构建客户支持聊天、协作工具还是社交平台,WebSocket 都能提供即时更新所需的双向通信。在本文中,我们将探讨如何使用 FastAPI 的 WebSocket 功能构建一个强大的实时聊天应用程序,同时利用 Django 进行数据持久化和业务逻辑处理。

基于 FastAPI WebSockets 与 Django 集成的实时聊天应用

为什么选择 FastAPI + Django?

你可能会想,为什么我们要合并两个框架,而不是只使用一个。答案在于它们的优势互补:

  • FastAPI擅长以最小的开销、异步操作和高性能处理 WebSocket 连接
  • Django提供了成熟的 ORM、身份验证系统和管理界面来管理应用程序的数据

这种混合方法可让您为每项工作使用最佳工具:FastAPI 用于实时连接,Django 用于其他工作。

架构概述

本聊天应用程序有三个主要组件:

  1. Django 后端:处理用户身份验证、消息存储和 REST API 端点
  2. FastAPI WebSocket 服务器:管理实时连接和消息广播
  3. 前端客户端:同时连接两个服务,实现无缝用户体验

设置 Django

首先,创建一个包含聊天功能所需模型的 Django 项目。

安装依赖项

pip install django djangorestframework django-cors-headers psycopg2-binary

创建聊天模型

# chat/models.py
from django.db import models
from django.contrib.auth.models import User
class ChatRoom(models.Model):
    name = models.CharField(max_length=255)
    slug = models.SlugField(unique=True)
    created_at = models.DateTimeField(auto_now_add=True)
    participants = models.ManyToManyField(User, related_name='chat_rooms')
    
    def __str__(self):
        return self.name
class Message(models.Model):
    room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
    sender = models.ForeignKey(User, on_delete=models.CASCADE)
    content = models.TextField()
    timestamp = models.DateTimeField(auto_now_add=True)
    is_read = models.BooleanField(default=False)
    
    class Meta:
        ordering = ['timestamp']
    
    def __str__(self):
        return f"{self.sender.username}: {self.content[:50]}"

创建 Django REST API 端点

# chat/serializers.py
from rest_framework import serializers
from .models import ChatRoom, Message
from django.contrib.auth.models import User
class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['id', 'username', 'email']
class MessageSerializer(serializers.ModelSerializer):
    sender = UserSerializer(read_only=True)
    
    class Meta:
        model = Message
        fields = ['id', 'sender', 'content', 'timestamp', 'is_read']
class ChatRoomSerializer(serializers.ModelSerializer):
    participants = UserSerializer(many=True, read_only=True)
    last_message = serializers.SerializerMethodField()
    
    class Meta:
        model = ChatRoom
        fields = ['id', 'name', 'slug', 'created_at', 'participants', 'last_message']
    
    def get_last_message(self, obj):
        last_msg = obj.messages.last()
        if last_msg:
            return MessageSerializer(last_msg).data
        return None
# chat/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .models import ChatRoom, Message
from .serializers import ChatRoomSerializer, MessageSerializer
class ChatRoomViewSet(viewsets.ModelViewSet):
    queryset = ChatRoom.objects.all()
    serializer_class = ChatRoomSerializer
    permission_classes = [IsAuthenticated]
    lookup_field = 'slug'
    
    def get_queryset(self):
        return self.request.user.chat_rooms.all()
    
    @action(detail=True, methods=['get'])
    def messages(self, request, slug=None):
        room = self.get_object()
        messages = room.messages.all()
        serializer = MessageSerializer(messages, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['post'])
    def join(self, request, slug=None):
        room = self.get_object()
        room.participants.add(request.user)
        return Response({'status': 'joined'})

配置 URL 和设置

# chat/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import ChatRoomViewSet
router = DefaultRouter()
router.register(r'rooms', ChatRoomViewSet)
urlpatterns = [
    path('api/', include(router.urls)),
]

更新您的 Django 设置以启用 CORS 并配置数据库:

#settings.py 
INSTALLED_APPS = [ 
    #... 
    'rest_framework' , 
    'corsheaders' , 
    'chat' , 
]
MIDDLEWARE = [
    'corsheaders.middleware.CorsMiddleware',
    # ...其他中间件
]
CORS_ALLOWED_ORIGINS = [
    "http://localhost:3000",
    "http://localhost:8001",  # FastAPI server
]
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.TokenAuthentication',
    ],
}

构建 FastAPI WebSocket 服务器

现在开始创建处理 WebSocket 连接的 FastAPI 应用程序。

安装 FastAPI 依赖项

pip install fastapi uvicorn websockets python-multipart httpx

创建 WebSocket 管理器

# websocket_server.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, List
import json
import httpx
from datetime import datetime
app = FastAPI()
# 启用 CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
# 连接管理器处理多个 WebSocket 连接
class ConnectionManager:
    def __init__(self):
        # 按房间 slug 存储活动连接
        self.active_connections: Dict[str, List[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, room_slug: str):
        await websocket.accept()
        if room_slug not in self.active_connections:
            self.active_connections[room_slug] = []
        self.active_connections[room_slug].append(websocket)
    
    def disconnect(self, websocket: WebSocket, room_slug: str):
        if room_slug in self.active_connections:
            self.active_connections[room_slug].remove(websocket)
            if not self.active_connections[room_slug]:
                del self.active_connections[room_slug]
    
    async def broadcast(self, room_slug: str, message: dict):
        if room_slug in self.active_connections:
            disconnected = []
            for connection in self.active_connections[room_slug]:
                try:
                    await connection.send_json(message)
                except:
                    disconnected.append(connection)
            
            # 清理断开连接的客户端
            for conn in disconnected:
                self.disconnect(conn, room_slug)
    
    async def send_personal_message(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)
manager = ConnectionManager()
# Django API 配置
DJANGO_API_URL = "http://localhost:8000"
async def save_message_to_django(room_slug: str, user_id: int, content: str, token: str):
    """Save message to Django database via REST API"""
    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                f"{DJANGO_API_URL}/api/messages/",
                json={
                    "room_slug": room_slug,
                    "content": content,
                },
                headers={"Authorization": f"Token {token}"},
                timeout=5.0
            )
            return response.json() if response.status_code == 201 else None
        except Exception as e:
            print(f"Error saving message: {e}")
            return None
@app.websocket("/ws/chat/{room_slug}")
async def websocket_endpoint(
    websocket: WebSocket,
    room_slug: str,
    token: str = None
):
    """
    WebSocket endpoint for real-time chat
    Query parameter 'token' should contain the user's auth token
    """
    if not token:
        await websocket.close(code=4001, reason="Authentication required")
        return
    
    await manager.connect(websocket, room_slug)
    
    # 发送连接确认
    await manager.send_personal_message({
        "type": "connection",
        "message": "Connected to chat room",
        "room": room_slug
    }, websocket)
    
    try:
        while True:
            # 从客户端接收消息
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            # 提取消息详细信息
            content = message_data.get("content", "")
            user_id = message_data.get("user_id")
            username = message_data.get("username", "Anonymous")
            
            if not content.strip():
                continue
            
            # 保存到 Django 数据库
            saved_message = await save_message_to_django(
                room_slug, user_id, content, token
            )
            
            # 准备广播消息
            broadcast_data = {
                "type": "message",
                "id": saved_message.get("id") if saved_message else None,
                "content": content,
                "username": username,
                "user_id": user_id,
                "timestamp": datetime.now().isoformat(),
                "room": room_slug
            }
            
            # 向房间内所有客户端广播
            await manager.broadcast(room_slug, broadcast_data)
            
    except WebSocketDisconnect:
        manager.disconnect(websocket, room_slug)
        # 通知其他用户
        await manager.broadcast(room_slug, {
            "type": "user_left",
            "message": f"User left the chat",
            "room": room_slug
        })
    except Exception as e:
        print(f"Error: {e}")
        manager.disconnect(websocket, room_slug)
@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "active_rooms": len(manager.active_connections),
        "total_connections": sum(len(conns) for conns in manager.active_connections.values())
    }
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

前端集成

现在创建一个简单的 HTML/JavaScript 客户端来连接到我们的聊天应用程序。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Real-Time Chat</title>
    <style>
        * { margin: 0; padding: 0; box-sizing: border-box; }
        body {
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            height: 100vh;
            display: flex;
            justify-content: center;
            align-items: center;
        }
        .chat-container {
            width: 90%;
            max-width: 800px;
            height: 600px;
            background: white;
            border-radius: 12px;
            box-shadow: 0 10px 40px rgba(0,0,0,0.2);
            display: flex;
            flex-direction: column;
        }
        .chat-header {
            background: #667eea;
            color: white;
            padding: 20px;
            border-radius: 12px 12px 0 0;
        }
        .chat-messages {
            flex: 1;
            overflow-y: auto;
            padding: 20px;
            background: #f7f7f7;
        }
        .message {
            margin-bottom: 15px;
            padding: 12px;
            border-radius: 8px;
            max-width: 70%;
        }
        .message.sent {
            background: #667eea;
            color: white;
            margin-left: auto;
            text-align: right;
        }
        .message.received {
            background: white;
            border: 1px solid #e0e0e0;
        }
        .message .username {
            font-weight: bold;
            font-size: 0.9em;
            margin-bottom: 5px;
        }
        .message .timestamp {
            font-size: 0.75em;
            opacity: 0.7;
            margin-top: 5px;
        }
        .chat-input {
            display: flex;
            padding: 20px;
            background: white;
            border-radius: 0 0 12px 12px;
            border-top: 1px solid #e0e0e0;
        }
        .chat-input input {
            flex: 1;
            padding: 12px;
            border: 1px solid #ddd;
            border-radius: 6px;
            font-size: 14px;
        }
        .chat-input button {
            margin-left: 10px;
            padding: 12px 24px;
            background: #667eea;
            color: white;
            border: none;
            border-radius: 6px;
            cursor: pointer;
            font-weight: bold;
        }
        .chat-input button:hover {
            background: #5568d3;
        }
        .status {
            padding: 8px;
            text-align: center;
            font-size: 0.85em;
            background: #f0f0f0;
        }
        .status.connected { background: #d4edda; color: #155724; }
        .status.disconnected { background: #f8d7da; color: #721c24; }
    </style>
</head>
<body>
    <div class="chat-container">
        <div class="chat-header">
            <h2>Real-Time Chat Room</h2>
            <small id="roomName"></small>
        </div>
        <div class="status" id="status">Connecting...</div>
        <div class="chat-messages" id="messages"></div>
        <div class="chat-input">
            <input type="text" id="messageInput" placeholder="Type a message..." />
            <button onclick="sendMessage()">Send</button>
        </div>
    </div>
    <script>
        // Configuration
        const WEBSOCKET_URL = 'ws://localhost:8001/ws/chat/general';
        const AUTH_TOKEN = 'your-auth-token-here'; // Get from Django authentication
        const USER_ID = 1; // Current user ID
        const USERNAME = 'Demo User'; // Current username
        
        let ws;
        let reconnectInterval = 3000;
        
        // Connect to WebSocket
        function connect() {
            ws = new WebSocket(`${WEBSOCKET_URL}?token=${AUTH_TOKEN}`);
            
            ws.onopen = () => {
                console.log('Connected to WebSocket');
                updateStatus('connected', 'Connected');
            };
            
            ws.onmessage = (event) => {
                const data = JSON.parse(event.data);
                handleMessage(data);
            };
            
            ws.onclose = () => {
                console.log('Disconnected from WebSocket');
                updateStatus('disconnected', 'Disconnected. Reconnecting...');
                setTimeout(connect, reconnectInterval);
            };
            
            ws.onerror = (error) => {
                console.error('WebSocket error:', error);
            };
        }
        
        // Handle incoming messages
        function handleMessage(data) {
            const messagesDiv = document.getElementById('messages');
            
            if (data.type === 'connection') {
                console.log('Connection confirmed:', data.message);
                return;
            }
            
            if (data.type === 'message') {
                const messageDiv = document.createElement('div');
                messageDiv.className = data.user_id === USER_ID ? 'message sent' : 'message received';
                
                const timestamp = new Date(data.timestamp).toLocaleTimeString();
                
                messageDiv.innerHTML = `
                    ${data.user_id !== USER_ID ? `<div class="username">${data.username}</div>` : ''}
                    <div class="content">${escapeHtml(data.content)}</div>
                    <div class="timestamp">${timestamp}</div>
                `;
                
                messagesDiv.appendChild(messageDiv);
                messagesDiv.scrollTop = messagesDiv.scrollHeight;
            }
        }
        
        // Send message
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const content = input.value.trim();
            
            if (!content || !ws || ws.readyState !== WebSocket.OPEN) {
                return;
            }
            
            const message = {
                content: content,
                user_id: USER_ID,
                username: USERNAME
            };
            
            ws.send(JSON.stringify(message));
            input.value = '';
        }
        
        // Update connection status
        function updateStatus(status, message) {
            const statusDiv = document.getElementById('status');
            statusDiv.className = `status ${status}`;
            statusDiv.textContent = message;
        }
        
        // Utility function to escape HTML
        function escapeHtml(text) {
            const div = document.createElement('div');
            div.textContent = text;
            return div.innerHTML;
        }
        
        // Allow Enter key to send message
        document.getElementById('messageInput').addEventListener('keypress', (e) => {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
        
        // Initialize connection
        connect();
    </script>
</body>
</html>

运行应用程序

启动 Django 服务器

# 应用迁移
python manage.py makemigrations 
python manage.py migrate
# 创建超级用户
python manage.py createsuperuser
# 运行 Django 服务器
python manage.py runserver 8000

启动 FastAPI WebSocket 服务器

# 运行 FastAPI 服务器
python websocket_server.py

FastAPI 服务器将在端口 8001 上启动,处理所有 WebSocket 连接。

打开前端界面

在浏览器中打开HTML文件,或通过本地开发服务器运行。请确保更新JavaScript代码中的认证令牌。

可考虑的高级功能

1. 输入指示器

添加实时输入指示器,显示用户正在编写消息的状态:

@app.websocket( "/ws/chat/{room_slug}" ) 
async  def  websocket_endpoint ( websocket: WebSocket, room_slug: str ): 
    # ... 现有代码 ... 
    
    if message_data.get( "type" ) == "typing" : 
        await manager.broadcast(room_slug, { 
            "type" : "typing" , 
            "username" : username, 
            "is_typing" : message_data.get( "is_typing" , False ) 
        })

2. 已读回执

通过更新 Django 模型来跟踪消息的读取时间:

@action(detail=True, methods=['post'])
def mark_read(self, request, pk=None):
    message = self.get_object()
    message.is_read = True
    message.save()
    return Response({'status': 'marked as read'})

3. 文件共享

扩展消息模型以支持文件附件:

class Message(models.Model):
    # ... 现有字段 ...
    attachment = models.FileField(upload_to='chat_files/', null=True, blank=True)
    attachment_type = models.CharField(max_length=50, null=True, blank=True)

4. 消息表情

为消息添加表情反应:

class MessageReaction(models.Model):
    message = models.ForeignKey(Message, on_delete=models.CASCADE, related_name='reactions')
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    emoji = models.CharField(max_length=10)
    created_at = models.DateTimeField(auto_now_add=True)

性能优化技巧

1. 连接池

对于生产环境,实现数据库操作的连接池:

import asyncpg
class DatabasePool:
    def __init__(self):
        self.pool = None
    
    async def create_pool(self):
        self.pool = await asyncpg.create_pool(
            dsn='postgresql://user:password@localhost/chatdb',
            min_size=10,
            max_size=20
        )
    
    async def execute(self, query, *args):
        async with self.pool.acquire() as connection:
            return await connection.execute(query, *args)

2. 消息缓存

使用 Redis 缓存最新消息并减少数据库负载:

import redis.asyncio as redis
redis_client = redis.from_url("redis://localhost")
async def get_cached_messages(room_slug: str):
    cached = await redis_client.get(f"room:{room_slug}:messages")
    if cached:
        return json.loads(cached)
    return None
async def cache_messages(room_slug: str, messages: list):
    await redis_client.setex(
        f"room:{room_slug}:messages",
        300,  # Cache for 5 minutes
        json.dumps(messages)
    )

3. 速率限制

通过速率限制防止垃圾邮件和滥用:

from collections import defaultdict
from time import time
class RateLimiter:
    def __init__(self, max_messages=10, time_window=60):
        self.max_messages = max_messages
        self.time_window = time_window
        self.messages = defaultdict(list)
    
    def is_allowed(self, user_id: int) -> bool:
        now = time()
        user_messages = self.messages[user_id]
        
        # Remove old messages outside time window
        user_messages[:] = [msg_time for msg_time in user_messages 
                           if now - msg_time < self.time_window]
        
        if len(user_messages) >= self.max_messages:
            return False
        
        user_messages.append(now)
        return True
rate_limiter = RateLimiter()

安全注意事项

1. 身份验证和授权

在接受 WebSocket 连接之前始终验证用户令牌:

async def verify_token(token: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"{DJANGO_API_URL}/api/auth/verify/",
            headers={"Authorization": f"Token {token}"}
        )
        if response.status_code == 200:
            return response.json()
        return None
@app.websocket("/ws/chat/{room_slug}")
async def websocket_endpoint(websocket: WebSocket, room_slug: str, token: str):
    user_data = await verify_token(token)
    if not user_data:
        await websocket.close(code=4001, reason="Invalid token")
        return
    # ... continue with connection

2. 输入清理

始终清理用户输入以防止 XSS 攻击:

import bleach
def sanitize_message(content: str) -> str:
    allowed_tags = ['b', 'i', 'u', 'em', 'strong']
    return bleach.clean(content, tags=allowed_tags, strip=True)

3. 生产中的 SSL/TLS

对于生产部署,请始终使用安全的 WebSocket 连接(wss://):

if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8001,
        ssl_keyfile="/path/to/key.pem",
        ssl_certfile="/path/to/cert.pem"
    )

部署策略

Docker Compose 配置

创建 docker-compose.yml 以便轻松部署:

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: chatdb
      POSTGRES_USER: chatuser
      POSTGRES_PASSWORD: chatpass
    volumes:
      - postgres_data:/var/lib/postgresql/data
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  django:
    build: ./django
    command: gunicorn config.wsgi:application --bind 0.0.0.0:8000
    volumes:
      - ./django:/app
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
  fastapi:
    build: ./fastapi
    command: uvicorn websocket_server:app --host 0.0.0.0 --port 8001
    volumes:
      - ./fastapi:/app
    ports:
      - "8001:8001"
    depends_on:
      - django
volumes:
  postgres_data:

结论

使用 FastAPI WebSockets 和 Django 构建实时聊天应用程序,可以结合两个框架的优势。FastAPI 负责处理性能至关重要的实时连接,而 Django 则通过强大的 ORM 和管理界面管理应用程序的数据和业务逻辑。

该架构具有良好的扩展性,并可以扩展以下功能:

  • 支持多房间用户在线
  • 用户之间的直接消息传递
  • 消息搜索和历史记录
  • 为离线用户推送通知
  • 视频和语音通话集成

成功的关键在于保持 WebSocket 层轻量且快速,同时将繁重的操作转移至 Django 的 REST API。通过适当的缓存、速率限制和安全措施,此设置可以高效处理数千个并发连接。

作者:Yogeshkrishnanseeniraj

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/62385.html

(0)

相关推荐

发表回复

登录后才能评论