使用 Nest.js 和 Socket IO 的实时竞价引擎

“一次,两次,以这个价格成交”,是的,你没听错,我们说的就是竞拍。我们都在电影或现场拍卖中听说过,人们在拍卖中相互竞争,以赢得最终的头衔。一些平台正在为在线拍卖提供实时竞价。在本文中,我们将讨论实时竞价背后的机制、工作原理以及使用 Nest.js 和 Socket IO 进行开发。

什么是 Socket.IO?

Socket.io 是一种流行的 JavaScript 库,它允许我们在网络浏览器和服务器之间创建实时双向通信。它是一个高性能、高可靠性的库,设计用于处理大量数据。它遵循 WebSocket 协议并提供更好的功能,使我们能够构建高效的实时应用程序。

发布-订阅设计模式

发布-订阅(Publish-Subscribe)又称发布/订阅(pub/sub),是客户端与服务器之间实时通信的常用设计模式。它允许服务器通过通道/媒介实时向客户端发送消息。这些信息的发送者(发布者)并不明确标识目标接收者。相反,信息是通过一个信道发送的,信道上可能有任意数量的接收者(订阅者)在等待这些信息。

发布者: 负责为目标受众生成事件/信息。

订阅者: 收听事件或接收发布者信息的受众。

通道: 进行数据交换的媒介。

使用 Nest.js 和 Socket IO 的实时竞价引擎
使用事件总线的 Pub/Sub 设计模式

实时竞价是如何运作的?

我们已经看到了多个在线竞价平台,但竞价过程是如何在幕后进行的?让我们来看看发生在后台的事件:

  • 假设两个用户 A 和 B 同时访问一个在线拍卖平台。
  • 一旦他们表现出参与竞拍的意图,网络浏览器(客户端)就会订阅这两个用户,以监听 “出价 “事件。
  • 另一个用户 C 同时出价,出价请求到达服务器。
  • 服务器处理请求,然后向所有订阅者发布 “出价 “事件。
  • 用户 A 和 B 都订阅了 “出价 “事件,因此他们会收到服务器发送的事件,出价金额也会实时更新。
  • 这样,所有访问平台的用户都能看到实时竞价,甚至无需刷新网页浏览器即可参与竞价。
使用 Nest.js 和 Socket IO 的实时竞价引擎
使用发布/订阅模型的实时出价

并发管理

实时竞价的关键之一是管理并发事务。让我们举一个简单的例子来理解这种情况:三个用户 A、B 和 C 在一次在线拍卖中相互竞争。

用户 A 首先出价 5 万美元,处于领先地位,现在用户 B 和 C 想要领先,同时出价 6 万美元。理想情况下,先到达服务器的出价必须被接受,而另一个出价必须被拒绝。

使用链接列表可以轻松解决并发事务的问题。对于每次拍卖,出价都将以链接列表的形式进行,因此每个出价都与最后一次出价相连,并带有特殊的唯一性约束,即链接列表中的所有父节点都必须是唯一的。

让我们重温一下这个例子,看看链接列表是如何解决并发交易问题的。用户 A 最初出价开始拍卖时,该出价成为链接列表的首节点,例如 id – 5。现在,用户 B 和 C 的父出价都是 5,当他们同时出价时,由于两个事务的父出价相同,最先到达服务器的出价将被处理,而另一个出价将因父出价唯一性约束而被拒绝。

使用 Nest.js 和 Socket IO 的实时竞价引擎
并发投标的链表表示

执行

1. 创建一个新的NestJs项目并安装所需的依赖项:

$ npm i -g @nestjs/cli
$ nest new bidding-engine
$ npm i socket.io
$ npm i ioredis
$ npm i –save sequelize sequelize-typescript mysql2
$ npm I –save-dev @types/sequelize

2. 创建Redis适配器来初始化pub/sub客户端

export class RedisIoAdapter extends IoAdapter {
  private adapterConstructor: ReturnType<typeof createAdapter>;
  constructor(
    app: INestApplication,
    private readonly configService: ConfigService,
  ) {
    super(app);
  }

  async connectToRedis(): Promise<void> {
    const redisConfig = this.configService.get('redis');
    let publish: Redis | Cluster = new Redis({
      host: host,
      port: port,
    });
    const subscribe = publish.duplicate();
    publish.on('error', (error) => {
      console.log('redis connection failed: ', error);
    });
    subscribe.on('error', (error) => {
      console.log('redis connection failed: ', error);
    });
    this.adapterConstructor = createAdapter(publish, subscribe);
  }

  createIOServer(port: number, options?: ServerOptions) {
    const server: Server = super.createIOServer(port, options) as Server;
    server.adapter(this.adapterConstructor);
    const webSocketConfig = this.configService.get<any>('webSocket');
    const timeout: number =
      webSocketConfig?.websocketHearthbeatTimeout || 30000;
    setInterval(() => {
      const clients: Map<string, Socket> = server.sockets.sockets;
      Object.keys(clients).forEach((socketId) => {
        const socket: Socket = clients[socketId] as Socket;
        if (socket.connected) {
          socket.send('ping');
        }
      });
    }, timeout);

    server.on('connection', (socket) => {
      socket.on('message', (message: string) => {
        if (message === 'pong') {
          const pingTimeout = socket['pingTimeout'] as { refresh: () => void };
          pingTimeout.refresh();
        }
      });
    });
    return server;
  }
}

3. 创建适配器后,我们将适配器集成到应用程序中以初始化 Redis 和 pub/sub 客户端

const redisIoAdapter = new RedisIoAdapter(app, configService);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);

4. 创建一个网关,允许用户订阅事件并在出价后立即发布事件。

@WebSocketGateway({
  cors: true,
  namespace: 'place-bid',
  transports: ['websocket'],
})
export class AuctionGateway {
  @WebSocketServer()
  private readonly server: Server;

  @SubscribeMessage('place-bid-join')
  async joinRoom(client: Socket, roomId: string): Promise<void> {
    await client.join(roomId);
  }

  @SubscribeMessage('place-bid-leave')
  async leaveRoom(client: Socket, roomId: string): Promise<void> {
    await client.leave(roomId);
  }

  public placeBidToRoom(roomId: string, payload: any): void {
    this.server.to(roomId).emit('new-bid-placed', payload);
  }
}

5. 适配器和网关都已就位,现在我们要创建一个 API 端点,它将响应即将到来的出价(获取拍卖 ID 和出价金额),在数据库中插入出价,检查并发性,并发布出价事件。我们将创建不同的类来处理各自的逻辑,如下所示:

Bid Controller 类将作为下达新竞价的端点。

投标服务类将处理业务逻辑和相应的验证。

竞标存储库类将处理竞标的插入,并提供上一次提交的竞标。

投标基本模型代表投标表的数据库视图。

@Controller()
export class BidController {
  constructor(
    private readonly bidService: BidService,
    private readonly appGateway: AuctionGateway,
  ) {}

  @Post('bid')
  @HttpCode(HttpStatus.OK)
  async placeBid(@Body() request: { auctionId: string; bidAmount: number, parentBidId: string | null }) {
    const response = (await this.buyerService.placeBid(
      auctionId,
      bidAmount,
     parentBidId,
    )) as any;

    const { bidDetails, auctionId } = response;
    const wsResponse: any = {
      amount: bidDetails.value || 0,
      auctionId,
      bidId: bidDetails.id || '',
      createdAt: bidDetails.bidTime,
    };
    this.appGateway.placeBidToRoom(request.auctionId, wsResponse);
    return {
      success: true,
    };
  }
}
@Injectable()
export class BidService {
  constructor(
    private readonly sequelize: Sequelize,
    private readonly bidRepository: BidRepository,
  ) {}

  async placeBid(auctionId: string, 
                bidAmount: number, 
                parentBidId: string | null) {
    try {
      const bidCreated = await this.sequelize.transaction(
        async (transaction) =>
          this.bidRepository.create(
            {
              bidAmount,
              parentBid: parentBidId,
              auctionId,
            },
            transaction,
          ),
        );
      const response = {
        auctionId,
        bidDetails: {
          bidTime: new Date(),
          id: bidCreated.id,
          value: bidCreated.bidAmount,
        },
        success: true,
      };
      return response;
    } catch (error) {
      if (
        error instanceof ValidationError &&
        error.name === 'SequelizeUniqueConstraintError'
      ) {
        const exception = {
          error: 'You have been outbid',
          details: {},
        };
        throw new ConflictException(exception);
      }
      if (error instanceof HttpException) {
        throw error;
      }
      throw new InternalServerErrorException(error);
    }
  }
}
@Injectable()
export class BidRepository {
constructor(@InjectModel(BidBase) private bidModel: typeof BidBase) {}

create(bid: Partial<BidBase>, transaction: Transaction) {
const data: Optional<BidBase, never> = {
...(bid as Required<BidBase>),
};
return this.bidModel.create(data, { transaction });
}

public async findLastBid(auctionId: string): Promise<BidBase | null> {
return this.bidModel.findOne({
where: {
auctionId,
},
order: [['created_at', 'DESC']],
});
}
}
type CreationColumns = 'auctionId' | 'bidAmount' | 'parentBid';
export type CreateBidParams = Pick<BidBase, CreationColumns>;

@Table({
tableName: 'tbl_bid',
underscored: true,
})
export class BidBase extends Model<BidBase, CreateBidParams> {
@PrimaryKey
@IsUUID('all')
@Default(Sequelize.literal('NewId()'))
@Column
id!: string;

@Column
auctionId!: string;

@Column({ allowNull: true })
parentBid?: string;

@Column(DataType.DECIMAL(10, 2))
bidAmount!: number;

@CreatedAt
createdAt!: Date;

@UpdatedAt
updatedAt?: Date;

@DeletedAt
deletedAt?: Date;

@Column
createdBy?: string;

@Column
updatedBy?: string;

@Column
deletedBy?: string;
}

6. 在 parentBid 和 auctionId 上创建唯一群集索引,以防止并发交易问题。此外,在 parentBid 上创建一个外键,引用竞标基础模型的 id,以创建链接列表结构。

7. 最后,将竞价控制器集成到主应用程序中,现在您的终端就可以处理竞价并向用户发布事件了。

好极了,我们成功地使用 NestJs 开发了一个实时竞价引擎。您可以集成一个花哨的用户界面来监听竞价事件,提供用户登录,并与您的朋友竞相赢得拍卖。您还可以创建管理仪表板,查看即将发生的竞拍,在竞拍过程中拒绝竞拍,并宣布获胜者 “Goin 一次,Goin 两次,您是赢家 …..”。

作者:Asmit Bajaj
编译自medium.

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/33843.html

(0)

相关推荐

发表回复

登录后才能评论