如何设计类 Facebook 规模的聊天应用程序

在本文中,将讨论如何设计一款面向 Facebook 数十亿用户的聊天应用程序。以下是功能需求:

  • 用户可以与任何用户进行一对一聊天
  • 与 100 名用户进行群聊

基本的聊天工作流程是:用户 1 向聊天服务器发送消息。聊天服务器将消息转发给用户 2,并更新数据库中的聊天消息。

如何设计类 Facebook 规模的聊天应用程序

现在我们来看看用户 1、用户 2 和聊天服务器之间的通信协议。

  • HTTP : 用户 1 向聊天服务器发送 HTTP Post。聊天服务器将消息存储在数据库中。用户 2 不断轮询聊天服务器,查找来自用户 1 的新消息。这种方法无法扩展,效率也不高,因为即使采用长轮询的方法用户也会创建大量 HTTP 连接到服务器。
  • WebSockets:WebSockets 提供双向、低延迟通信,允许更快的实时功能。

我们选择 WebSockets 作为聊天应用程序的通信协议。上述架构只适用于所有用户都连接到同一服务器的单服务器场景。在这种情况下,聊天服务器知道可以向哪个用户发送消息。

如果用户 2 连接到不同的服务器怎么办?应用程序如何将信息转发给用户 2。

Pub/Sub 模式

我们为应用程序引入了 Pub/Sub 模式。当用户 1 向用户 2 发送一些文本时,它会将文本内容保存到数据库,同时向消息队列中的主题(频道)发布消息。本例中的主题是聊天室的 room_id。

消息的格式为 { room_id, user_id, message, timestamp}。

如何设计类 Facebook 规模的聊天应用程序

用户 2 将订阅主题 room_id,当用户 1 在主题 room_id 中发布消息时,用户 2 将自动接收该消息。用户 2 的 WebSocket 连接处理程序将读取消息,并将消息发送到用户 2 的客户端应用程序(网络或手机)。用户 2 可以订阅多个主题,因为它可能有多个房间。

使用 Redis 进行 Pub/Sub

有许多应用程序都能提供 Pub/Sub 服务,如 Redis、Apache Kafka、Amazon SNS、RabitMQ、MQTT、Google Cloud Pub/Sub……我们选择 Redis 是因为它在处理Pub/Sub操作方面以高性能和低延迟著称,这要归功于它的内存数据处理功能。Redis 具有可扩展性、可靠性和强大的社区支持,因此不仅在Pub/Sub方面,而且在缓存和会话管理等其他功能方面,都是一个多功能的选择。

数据库

聊天是重写和重读应用。NoSQL 是数据存储的更好选择。我们选择开源 Cassandra 作为 NoSQL 解决方案。Cassandra 以其卓越的写入性能而著称,因此非常适合需要高写入吞吐量的应用。

数据模型: 两个主表 rooms_messages 和 rooms_metadata。这两个表都有使用 room_id 的分区键,因此我们可以在同一个分区中查询这两个表。rooms_messages 表的聚类键是 created_at,因此信息将按时间排序。此数据模型可用于 1-1 或群聊

create table rooms_messages(  
  room_id uuid,
  user_id uuid,
  message_id uuid,
  message varchar,
  created_at timestamp,  
  primary key (room_id, created_at) 
)

create table rooms_metadata (
  room_id uuid,
  created_by varchar,
  name varchar,
  created_at timestamp,
  user_list set<uuid>,
  primary key (room_id)
)

扩展到 1B DAU

假设每位用户每天发送 10 条聊天信息,则写入 QPS:

1B*10 / 86400(秒) ~ 10¹⁰ / 10⁵ = 100,000

使用 80/20 规则计算的峰值写 QPS:400,000

假设读取 QPS 是写入 QPS 的 20 倍。读取 QPS = 2,000,000

峰值读取 QPS = 8,000,000

根据 Netflix 的 Cassandra 基准测试,我们需要大约 288 台服务器来实现每秒 110 万次写入。因此,对于 400,000 的峰值 QPS,我们只需要大约 100 台 Cassandra 集群服务器。

假设每个用户生成10个聊天室,Redis需要保存1B*10 = 10B个频道,并且内部哈希表和链表中需要20字节的指针来跟踪每个订阅者。需要 1B*20 / 10⁹ = 200G。这可以容纳两个 Redis 服务器。

对于 CPU 使用率:写入 QPS = 400,000 并假设每个写入请求需要扇出到 5 个用户或 400K*5 = 每秒 2M 推送。具有 Gb 网络的现代服务器每秒可以处理大约 100K 推送。我们需要 2M / 100K = 2*10⁶ / 10⁵ = 20 台服务器。所以我们总共需要 20 个 Redis 服务器。

失败情形:

当聊天服务器收到来自客户端的消息时,它会插入数据库,同时将新消息发布到 PubSub 频道。如果写入数据库失败,消息就会在数据库中丢失,但会在客户端显示。要解决这个不一致问题。我们需要确保在将消息推送到 Redis 频道之前成功写入 Cassandra。在客户端代码中只需使用 try 和 catch 语句。如果没有异常,我们就可以将消息推送到通道。

负载均衡器:

有效扩展 WebSocket 连接,尤其是使用负载均衡器时,需要仔细规划和实施。WebSocket 是一种有状态协议,与无状态 HTTP 请求相比,通过负载平衡来维护其连接状态可能具有挑战性。

维护持久连接: 会话关联(粘性会话), 要处理 WebSockets 的有状态特性,可在负载均衡器中使用会话关联。这可确保一旦与特定服务器建立了 WebSocket 连接,来自该客户端的所有后续流量都会流向同一服务器。这可以根据 cookie 或客户端的 IP 地址来实现。

后端示例代码

我们使用节点集群为每个服务器设置多个节点线程。这种设置可帮助您的 WebSocket 服务器在多个 CPU 内核上进行扩展,从而提高高并发场景的性能。

const cluster = require('cluster');
const http = require('http');
const { WebSocketServer } = require('ws');
const os = require('os');

const numCPUs = os.cpus().length;

if (cluster.isMaster) {
    console.log(`Master ${process.pid} is running`);

    // Fork workers.
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died`);
        // Optionally fork a new worker on exit
        cluster.fork();
    });
} else {
    // Workers share the HTTP server
    const server = http.createServer();
    const wss = new WebSocketServer({ server });

    const chatRooms = new Map();

    // Function to broadcast a message to all subscribers of a room
    function broadcastMessage(room, message) {
        const subscribers = chatRooms.get(room);
        if (subscribers) {
            subscribers.forEach(client => {
                if (client.readyState === WebSocket.OPEN) {
                    client.send(JSON.stringify(message));
                }
            });
        }
    }

    wss.on('connection', function connection(ws) {
        let currentRoom = null;

        ws.on('message', function incoming(message) {
            try {
                const data = JSON.parse(message);

                switch (data.type) {
                    case 'join':
                        // Join a specific chat room
                        currentRoom = data.room;
                        if (!chatRooms.has(currentRoom)) {
                            chatRooms.set(currentRoom, new Set());
                        }
                        chatRooms.get(currentRoom).add(ws);
                        broadcastMessage(currentRoom, { type: 'info', message: 'A new user has joined the room.' });
                        break;
                    case 'message':
                        // Broadcast a message to the room
                        if (currentRoom) {
                            broadcastMessage(currentRoom, { type: 'message', message: data.message });
                        }
                        break;
                    case 'leave':
                        // Leave the current room
                        if (currentRoom && chatRooms.has(currentRoom)) {
                            chatRooms.get(currentRoom).delete(ws);
                            broadcastMessage(currentRoom, { type: 'info', message: 'A user has left the room.' });
                            currentRoom = null;
                        }
                        break;
                }
            } catch (e) {
                console.error('Error handling message', e);
            }
        });

        ws.on('close', function() {
            if (currentRoom && chatRooms.has(currentRoom)) {
                chatRooms.get(currentRoom).delete(ws);
                broadcastMessage(currentRoom, { type: 'info', message: 'A user has disconnected.' });
            }
        });
    });

    // Start the server on port 3000
    server.listen(3000, function() {
        console.log(`Worker ${process.pid} started, listening on http://localhost:3000`);
    });
}

在本示例中,每个工作者维护自己的聊天室和连接。在实际应用中,如果状态需要在工作者之间共享或同步(例如,为了在所有工作者之间形成一致的聊天室视图),可以考虑使用 Redis 等外部存储或共享内存方法。

操作系统的网络层会自动在 Worker 之间分配传入连接。这有助于利用所有 CPU 内核,提高服务器处理更多连接的能力。

React 前端

// ChatClient.tsx
import React, { useState, useEffect, useRef } from 'react';

// Define types for messages sent and received
interface ChatMessage {
  type: string;
  message?: string;
  room?: string;
}

const ChatClient: React.FC = () => {
  const [room, setRoom] = useState<string>('');
  const [message, setMessage] = useState<string>('');
  const [messages, setMessages] = useState<string[]>([]);
  const ws = useRef<WebSocket | null>(null);

  useEffect(() => {
    // Initialize the WebSocket connection
    ws.current = new WebSocket('ws://localhost:3000');
    ws.current.onmessage = (event) => {
      const data: ChatMessage = JSON.parse(event.data);
      if (data.type === 'message' || data.type === 'info') {
        setMessages((prevMessages) => [...prevMessages, data.message || '']);
      }
    };

    // Clean up the WebSocket on component unmount
    return () => {
      if (ws.current) {
        ws.current.close();
      }
    };
  }, []);

  const joinRoom = () => {
    if (ws.current && room) {
      ws.current.send(JSON.stringify({ type: 'join', room }));
    }
  };

  const sendMessage = () => {
    if (ws.current && message) {
      ws.current.send(JSON.stringify({ type: 'message', message }));
      setMessage('');
    }
  };

  const leaveRoom = () => {
    if (ws.current) {
      ws.current.send(JSON.stringify({ type: 'leave' }));
    }
  };

  return (
    <div>
      <h1>Chat Room Client</h1>
      <div>
        <input
          type="text"
          value={room}
          onChange={(e) => setRoom(e.target.value)}
          placeholder="Enter Room Name"
        />
        <button onClick={joinRoom}>Join Room</button>
        <button onClick={leaveRoom}>Leave Room</button>
      </div>
      <div>
        <input
          type="text"
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          placeholder="Enter Message"
        />
        <button onClick={sendMessage}>Send Message</button>
      </div>
      <div style={{ margin: '20px 0', border: '1px solid #ccc', padding: '10px', width: '300px', height: '200px', overflowY: 'scroll' }}>
        <h4>Messages</h4>
        {messages.map((msg, index) => (
          <div key={index}>{msg}</div>
        ))}
      </div>
    </div>
  );
};

export default ChatClient;

概括

在本文中,我们讨论 Facebook 规模的聊天应用程序的设计。我们选择WebSockets作为主要通信协议。我们使用 pub/sub 来扩展到多个服务器,并使用 NoSQL 数据库来存储聊天消息。我们还讨论了将粘性负载均衡器用于 websockets 有状态服务。最后,我们展示了聊天应用程序后端和前端的一些示例代码。

作者:Tuan Do,Amazon、Amwell、Politico 的软件工程师,乔治梅森大学 ECE 博士。

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/48431.html

(0)

相关推荐

发表回复

登录后才能评论