使用 Python、Redis 和 FastAPI 通过 PUB/SUB 扩展 WebSockets

WebSocket 是一种通信协议,可在客户端(通常是网络浏览器)和服务器之间通过单个长效连接进行全双工通信,这意味着在客户端连接服务器的整个过程中,连接一直处于打开状态。与遵循请求-响应模式的传统 HTTP 请求不同,WebSockets 允许实时双向通信,从而实现高效的交互式网络应用。

就 WebSockets 而言,垂直扩展有助于在单个服务器上处理更多的 WebSocket 连接,并随着内存和处理能力的增加而提高整体性能。但是,纵向扩展也有实际限制,在某些情况下,成本和性能收益可能无法证明进一步纵向扩展是合理的。通过横向扩展,我们必须将 WebSocket 连接分配到多个服务器,以提高性能、处理更多并发连接并确保高可用性。负载平衡器通常用于将传入的 WebSocket 连接平均分配到可用的服务器实例上。然而,由于连接的状态特性以及在多个服务器之间同步和管理状态的需要,WebSocket 应用程序的横向扩展可能具有挑战性。这时,我们可以利用 PUB/SUB 系统的强大功能来管理多个服务器实例之间的状态。

使用 Python、Redis 和 FastAPI 通过 PUB/SUB 扩展 WebSockets
使用 websocket 的 pub/sub 结构设计

让我们来看看像 Google docs 这样的应用程序,我们有两个用户连接到服务器 server1。在这里,只要有新的更改日志可用,我们就可以通过为文档创建一个具有唯一标识符的通道,将数据发布给所有连接的用户,所有用户都将连接到该通道并接收更新。现在,由于该通道存在于单个实例中,我们可以很容易地将更改日志发布到连接到该服务器的 websockets 上,但如果其他用户编辑同一文档并连接到服务器 2,情况会怎样呢?他们根本无法收到用户 1 和用户 2 的更新,因为该通道只存在于服务器 1 中,与其他服务器没有任何联系。

在这里,我们可以应用的基本概念是为文档创建一个唯一的通道,该通道将在 PUB/SUB 系统(在我们的例子中是 Redis)中创建。连接到任何服务器的用户对文档所做的所有更改都将只发布到 Redis 频道。 所有正在处理该文档的用户都将是 Redis 频道的订阅者。因此,每当主题/通道中出现某些信息时,所有用户都会通过数据库收到有关更改日志的通知。唯一不同的是,PUB/SUB 将作为单点工作,有助于同步和建立不同服务器实例之间的链接。

下面我将使用FastAPI作为演示代码。

1、下面的 RedisPubSubManger 类将有助于创建与 Redis 的连接、订阅和取消订阅频道以及向频道发布消息。

import asyncio
import redis.asyncio as aioredis
import json
from fastapi import WebSocket


class RedisPubSubManager:
    """
        Initializes the RedisPubSubManager.

    Args:
        host (str): Redis server host.
        port (int): Redis server port.
    """

    def __init__(self, host='localhost', port=6379):
        self.redis_host = host
        self.redis_port = port
        self.pubsub = None

    async def _get_redis_connection(self) -> aioredis.Redis:
        """
        Establishes a connection to Redis.

        Returns:
            aioredis.Redis: Redis connection object.
        """
        return aioredis.Redis(host=self.redis_host,
                              port=self.redis_port,
                              auto_close_connection_pool=False)

    async def connect(self) -> None:
        """
        Connects to the Redis server and initializes the pubsub client.
        """
        self.redis_connection = await self._get_redis_connection()
        self.pubsub = self.redis_connection.pubsub()

    async def _publish(self, room_id: str, message: str) -> None:
        """
        Publishes a message to a specific Redis channel.

        Args:
            room_id (str): Channel or room ID.
            message (str): Message to be published.
        """
        await self.redis_connection.publish(room_id, message)

    async def subscribe(self, room_id: str) -> aioredis.Redis:
        """
        Subscribes to a Redis channel.

        Args:
            room_id (str): Channel or room ID to subscribe to.

        Returns:
            aioredis.ChannelSubscribe: PubSub object for the subscribed channel.
        """
        await self.pubsub.subscribe(room_id)
        return self.pubsub

    async def unsubscribe(self, room_id: str) -> None:
        """
        Unsubscribes from a Redis channel.

        Args:
            room_id (str): Channel or room ID to unsubscribe from.
        """
        await self.pubsub.unsubscribe(room_id)

2、WebSocketManger类,它将处理创建房间/频道、订阅/订阅房间等的逻辑。

class WebSocketManager:

    def __init__(self):
        """
        Initializes the WebSocketManager.

        Attributes:
            rooms (dict): A dictionary to store WebSocket connections in different rooms.
            pubsub_client (RedisPubSubManager): An instance of the RedisPubSubManager class for pub-sub functionality.
        """
        self.rooms: dict = {}
        self.pubsub_client = RedisPubSubManager()

    async def add_user_to_room(self, room_id: str, websocket: WebSocket) -> None:
        """
        Adds a user's WebSocket connection to a room.

        Args:
            room_id (str): Room ID or channel name.
            websocket (WebSocket): WebSocket connection object.
        """
        await websocket.accept()

        if room_id in self.rooms:
            self.rooms[room_id].append(websocket)
        else:
            self.rooms[room_id] = [websocket]

            await self.pubsub_client.connect()
            pubsub_subscriber = await self.pubsub_client.subscribe(room_id)
            asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber))

    async def broadcast_to_room(self, room_id: str, message: str) -> None:
        """
        Broadcasts a message to all connected WebSockets in a room.

        Args:
            room_id (str): Room ID or channel name.
            message (str): Message to be broadcasted.
        """
        await self.pubsub_client._publish(room_id, message)

    async def remove_user_from_room(self, room_id: str, websocket: WebSocket) -> None:
        """
        Removes a user's WebSocket connection from a room.

        Args:
            room_id (str): Room ID or channel name.
            websocket (WebSocket): WebSocket connection object.
        """
        self.rooms[room_id].remove(websocket)

        if len(self.rooms[room_id]) == 0:
            del self.rooms[room_id]
            await self.pubsub_client.unsubscribe(room_id)

    async def _pubsub_data_reader(self, pubsub_subscriber):
        """
        Reads and broadcasts messages received from Redis PubSub.

        Args:
            pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
        """
        while True:
            message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
            if message is not None:
                room_id = message['channel'].decode('utf-8')
                all_sockets = self.rooms[room_id]
                for socket in all_sockets:
                    data = message['data'].decode('utf-8')
                    await socket.send_text(data)

3、连接到 WebSocket 服务器并建立连接的 API 路由。

import logging
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from websocket.socketManager import WebSocketManager
import json
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", default=8000, type=int)
args = parser.parse_args()


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("FastAPI app")

app = FastAPI()

# Adding the CORS middleware to the app
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

socket_manager = WebSocketManager()


@app.websocket("/api/v1/ws/{room_id}/{user_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, user_id: int):
    await socket_manager.add_user_to_room(room_id, websocket)
    message = {
        "user_id": user_id,
        "room_id": room_id,
        "message": f"User {user_id} connected to room - {room_id}"
    }
    await socket_manager.broadcast_to_room(room_id, json.dumps(message))
    try:
        while True:
            data = await websocket.receive_text()
            message = {
                "user_id": user_id,
                "room_id": room_id,
                "message": data
            }
            await socket_manager.broadcast_to_room(room_id, json.dumps(message))

    except WebSocketDisconnect:
        await socket_manager.remove_user_from_room(room_id, websocket)

        message = {
            "user_id": user_id,
            "room_id": room_id,
            "message": f"User {user_id} disconnected from room - {room_id}"
        }
        await socket_manager.broadcast_to_room(room_id, json.dumps(message))


if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=args.port, reload=True)

当我们第一次收到带有房间 ID 的请求时,在接受 WebSocket 连接后,我们会检查是否已有可用的房间 ID。如果房间 ID 已经可用,我们就会将新的 WebSocket 连接添加到已经连接到房间 ID 的 WebSocket 连接列表中;如果房间 ID 不可用,我们就会先建立与 redis 的连接,然后以房间 ID 为名订阅 PUB/SUB 主题。订阅后,我们将附加一个阅读器 _pubsub_data_reader,它将不断从主题中提取数据,并在其中提取相关的 WebSocket 连接列表,然后将消息发送给客户端。这就是代码的整体工作概念。

remove_user_from_room 用于当 websocket 连接断开时,从与 room_id 关联的连接列表中删除该连接。如果某个房间 ID 没有可用的连接,我们就会取消订阅相应的主题。

结论

虽然这段代码小巧而简单,但使用 pub/sub 实现全面的 websocket 却相对复杂,需要考虑各种情况。我们需要考虑不同的故障安全机制,以避免任何中断,并在出现故障时进行连接重试等。

完整代码地址:https://github.com/NandaGopal56/websockets-pubsub

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论