异步 Django:将 WebSocket 扩展至 100 万并发连接

当我首次尝试为我们的 SaaS 平台构建实时通知系统时,以为处理几千个 WebSocket 连接会轻而易举。我错了,大错特错。当并发连接数达到 5000 左右时,服务器开始喘不过气来,我知道问题来了。

但六个月后,我们的基础设施如今已能从容应对超过 100 万个并发 WebSocket 连接。本文将讲述我们如何实现这一突破,踩了哪些坑,以及在扩展异步 Django 应用过程中汲取的经验教训。

异步 Django:将 WebSocket 扩展至 100 万并发连接

问题:传统 Django 为何力不从心

Django 诞生于一个不同的时代,那时请求-响应周期主导着 Web 开发。虽然 Django 已经发展了很长时间,但其传统的 WSGI 架构并非为像 WebSocket 那样的长时间连接而设计的。

核心问题在于“每个连接一个线程”的模型。每个 WebSocket 连接都会占用一个线程,而使用默认配置,在达到实际规模之前,线程就会耗尽。即使使用线程池,每个服务器也只能处理 500 到 1000 个连接,之后就会出现问题。

引入 Django Channels 和 ASGI

Django Channels 通过引入 ASGI(异步服务器网关接口)将 Django 转变为具备异步能力的框架。与 WSGI 不同,ASGI 无需创建数千个线程即可处理数千个并发连接。

我们最终确定的架构如下:

# routing.py
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from .consumers import NotificationConsumer
application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            path("ws/notifications/", NotificationConsumer.as_asgi()),
        ])
    ),
})

扩展策略的五大支柱

1. 全程异步

我们早期犯下的最大错误就是将同步代码与异步代码混用。每次从异步代码中调用同步函数时,都会阻塞事件循环。

# ❌ 不良 — 阻塞事件循环
class NotificationConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        user = User.objects.get(id=self.user_id)  
        
# ✅ 真正异步
class NotificationConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        user = await sync_to_async(User.objects.get)(id=self.user_id)

更好的做法是使用异步数据库驱动程序,例如 databases 或 Django 的原生异步 ORM 支持(适用于 Django 4.1 及以上版本):

2. Redis 作为通道层

Django Channels 需要一个通道层来实现不同服务器实例间的通信。我们尝试了多种方案,最终发现采用 channels_redis 后端的 Redis 在性能和可靠性方面达到了最佳平衡点。

# settings.py
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis-cluster.local", 6379)],
            "capacity": 1500,
            "expiry": 10,
        },
    },
}

实施的关键优化措施:

  • 采用 Redis 集群实现水平扩展
  • 根据消息模式调整capacityexpiry策略
  • 实施连接池机制避免连接开销

3. 智能连接分配

在合理配置下,单台服务器实际可处理 50,000 至 100,000 个 WebSocket连接。我们使用 NGINX 作为负载均衡器,基于用户 ID 实现粘性会话:

upstream websocket_backend {
    ip_hash;
    server ws-server-1:8000;
    server ws-server-2:8000;
    server ws-server-3:8000;
    server ws-server-4:8000;
}
server {
    location /ws/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 86400;
    }
}

4. 操作系统调优

这正是大多数人遇到的瓶颈所在。Linux 的默认限制会扼杀你的扩展能力:

# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.ipv4.ip_local_port_range = 1024 65535
fs.file-max = 2097152
# /etc/security/limits.conf
* soft nofile 1048576
* hard nofile 1048576

别忘了为应用程序用户增加文件描述符限制:

ulimit -n 1048576

5. 连接池与资源管理

我们实现了积极的连接池和资源管理:

# consumers.py
class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user_id = self.scope["user"].id
        self.group_name = f"user_{self.user_id}"
        
        # 加入特定用户组
        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )
        await self.accept()
        
        # 高效发送初始状态
        await self.send_initial_state()
    
    async def disconnect(self, close_code):
        # 大规模清理至关重要
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )
    
    async def send_initial_state(self):
        # 批量数据库查询
        notifications = await sync_to_async(list)(
            Notification.objects.filter(
                user_id=self.user_id, 
                read=False
            ).only('id', 'message', 'created_at')[:50]
        )
        await self.send(text_data=json.dumps({
            'notifications': [n.to_dict() for n in notifications]
        }))

监控与可观测性

无法衡量的事物无法扩展。我们对所有环节都进行了监控:

import prometheus_client
from prometheus_client import Counter, Histogram
websocket_connections = Counter(
    'websocket_connections_total', 
    'Total WebSocket connections'
)
websocket_message_duration = Histogram(
    'websocket_message_duration_seconds',
    'Time spent processing WebSocket messages'
)
class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        websocket_connections.inc()
        await super().connect()
    
    async def receive(self, text_data):
        with websocket_message_duration.time():
            await self.process_message(text_data)

优化成果

实施这些优化措施后,我们的基础设施指标显著提升:

  • 每台服务器连接数:50,000+(原为500)
  • CPU使用率:平均40%(原为95%)
  • 内存占用:每台服务器8GB(原为12GB)
  • 消息延迟:<50毫秒 p99(低于之前的2000毫秒)
  • 每次连接成本:$0.0001/月(原为$0.01/月)

经验教训

1. 从第一天起就采用异步设计。将异步功能回溯到同步代码库中会非常痛苦。

2. 持续进行性能分析。我们发现 80% 的 CPU 时间都耗费在数据库查询上,而这些查询本可通过缓存或批处理来优化。

3. Redis 是你的得力助手。配置得当的 Redis 集群每秒可处理数百万次发布/订阅操作。

4. 操作系统限制不容忽视。若操作系统实施了限流,再多的应用层优化也无济于事。

5. 尽早进行大规模测试。仅用100个连接进行的负载测试,无法反映100,000个连接时的系统行为。

总结

将 WebSockets 扩展至 100 万并发连接,不仅关乎技术栈的选择。更需要透彻理解整个系统——从应用层到内核层,系统性地优化每个环节。

Django Channels 最终证明完全能满足我们的需求,但前提是我们学会了正确使用它。异步范式需要转变思维方式,但由此获得的性能提升绝对物有所值。

若要在 Django 中构建实时功能,请从异步编程起步,拥抱 ASGI 框架,并勇于深入探索系统级优化。

作者:Yogeshkrishnanseeniraj,后端开发工程师

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/63370.html

(0)

相关推荐

发表回复

登录后才能评论