WebSub 连接消息代理 (WebSub入门系列四)

在前一篇文章中,我们介绍了如何将第三方 API 与 hub 实现集成。在本文中,我们将重点介绍通过整合消息代理来增强 hub 实现的可扩展性、可靠性和稳定性。具体来说,我们将在本示例中使用 Apache Kafka 消息代理。

消息代理非常适合实施 WebSub hubs,因为它们发挥着类似的作用。但是,在深入研究消息代理集成之前,我们应该明确当前实现的局限性。

下面的流程图描述了当前的通知调度流程。

WebSub 连接消息代理 (WebSub入门系列四)
没有 Kafka 的通知分发流程

我们可以看到有一个通知发送器组件,它将天气警报发送到新闻接收器(新闻频道)。但这种方法存在一些问题。

  • 每次运行通知发送器时,它都必须查询中心的内部状态,以检索特定位置的有效新闻接收器。
  • 如果在向新闻接收器分发内容时出现错误,特定的天气警报就会丢失。
  • 无法查询/查看已分发的天气警报。

将 Apache Kafka 纳入我们的通知分发流程将引入发布/订阅模式,这将带来诸多好处。

  • 通知生成和通知分发将分开。
  • Kafka 将为天气警报提供持久层。
  • 如果在将内容分发到新闻接收器时出现错误,我们可以实现适当的重试机制,因为通知将保留在 Kafka 中。

Kafka 的集成将导致修改后的通知调度流程与以下格式一致。

WebSub 连接消息代理 (WebSub入门系列四)
Kafka 的通知发送流程

根据流程图,hub 内有两个组件,即通知发送器(Notification Sender)和通知接收器(Notification Receiver)。通知发送器组件定期将天气报告发布到 Kafka 主题中。通知接收器组件监听 Kafka 主题,并在收到事件后将消息转发给新闻接收器(新闻频道)。每个通知接收器组件都与相应的新闻接收器直接关联,每个通知发送器都与生成天气报告的特定位置直接关联。

下图说明了 hub 内的组件如何映射到 Kafka 主题和用户组。

WebSub 连接消息代理 (WebSub入门系列四)
Hub和Kafka组件之间的映射

由于每个通知发送方都与一个位置相关联,因此将为每个位置分配一个相应的 Kafka 主题。此外,由于每个通知接收器都与新闻接收器相连,因此将为每个通知接收器分配一个唯一的 Kafka 消费者组。

Kafka 消息发布逻辑可以轻松实现。

kafka:ProducerConfiguration messagePersistConfig = {
    clientId: "message-persist-client",
    acks: "1",
    retryCount: 3
};
final kafka:Producer messagePersistProducer = check new ("localhost:9092", messagePersistConfig);

public isolated function publishWeatherNotification(string location, WeatherReport weatherReport) returns error? {
    json payload = weatherReport.toJson();
    byte[] serializedContent = payload.toJsonString().toBytes();
    check messagePersistProducer->send({
        topic: location,
        value:  serializedContent
    });
    check messagePersistProducer->'flush();
}

由于我们的通知发送器需要定期发布天气警报,因此我们将使用计划任务来实现它。

isolated class NotificationSender {
    *task:Job;
    private final string location;

    isolated function init(string location) {
        self.location = location;
    }

    public isolated function execute() {
        WeatherReport|error weatherReport = getWeatherReport(self.location);
        if weatherReport is error {
            log:printWarn(string `Error occurred while retrieving weather-report: ${weatherReport.message()}`, stackTrace = weatherReport.stackTrace());
            return;
        }
        error? persistResult = publishWeatherNotification(self.location, weatherReport);
        if persistResult is error {
            log:printWarn(string `Error occurred while persisting the weather-report: ${persistResult.message()}`, stackTrace = persistResult.stackTrace());
        }
    }
}

isolated function startNotificationSender(string location) returns task:JobId|error {
    NotificationSender notificationSender = new (location);
    // schedule the job to be run every 600 seconds (10 minutes)
    return task:scheduleJobRecurByFrequency(notificationSender, 600.0);
}

要设置新闻接收器,就必须具备从 Kafka 服务器获取消息的能力。因此,我们需要一个 Kafka 消费者。

function createMessageConsumer(websubhub:VerifiedSubscription message) returns kafka:Consumer|error {
    string groupName = string `${message.hubTopic}_${message.hubCallback}`;
    kafka:ConsumerConfiguration consumerConfiguration = {
        groupId: groupName,
        topics: [message.hubTopic],
        // turn-off the auto offset commit
        autoCommit: false
    };
    return check new ("localhost:9092", consumerConfiguration);  
}

通知接收器将使用 Ballerina 异步函数实现。

type UpdateMessageConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    weatherApi:WeatherReport value;
|};

function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? {
    kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver);
    websubhub:HubClient hubClient = check new (newsReceiver, {
        retryConfig: {
            interval: config:CLIENT_RETRY_INTERVAL,
            count: config:CLIENT_RETRY_COUNT,
            backOffFactor: 2.0,
            maxWaitInterval: 20
        },
        timeout: config:CLIENT_TIMEOUT
    });
    // start the async function for `News Receiver`
    _ = start pollForNewUpdates(hubClient, kafkaConsumer, newsReceiver);
}

isolated function pollForNewUpdates(websubhub:HubClient hubClient, kafka:Consumer kafkaConsumer, websubhub:VerifiedSubscription newsReceiver) returns error? {
    string location = newsReceiver.hubTopic;
    string receiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`;
    do {
        while true { 
            UpdateMessageConsumerRecord[] records = check kafkaConsumer->poll(10.0);
            if !isValidNewsReceiver(location, receiverId) {
                fail error(string `Subscriber with Id ${receiverId} or topic ${location} is invalid`);
            }
            var result = notifySubscribers(records, hubClient, kafkaConsumer);
            if result is error {
                log:printError("Error occurred while sending notification to subscriber ", err = result.message());
                check result;
            } else {
            	// commit the Kafka offset only if the message delivery is successfull
                check kafkaConsumer->'commit();
            }
        }
    } on fail var e {
        log:printError(string `Error occurred while sending notification to news-receiver: ${e.message()}`, stackTrace = e.stackTrace());
        removeNewsReceiver(receiverId);
        kafka:Error? result = kafkaConsumer->close(15.0);
        if result is kafka:Error {
            log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message());
        }
    }
}

分析上面的代码,我们可以看到,我们禁用了 Kafka 消费者的自动偏移提交,取而代之的是手动偏移提交机制。我们特意做出这一改动,是因为有了手动偏移提交,我们就能告知 Kafka 某条消息是否已被成功处理,还能为失败的消息启用重试机制。

有了这些改动,hub 实现的状态管理也发生了变化。

// `notificationsSenders` will be used to identify already started notification senders 
// location name will be the key for `notificationSenders` 
isolated map<task:JobId> notificationSenders = {};

isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {};

hub 服务的 onSubscriptionIntentVerified 功能现在将更新如下。

remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
    if !validNotificationSenderExists(subscription.hubTopic) {
        task:JobId notificationService = check startNotificationSender(subscription.hubTopic);
        lock {
            notificationSenders[subscription.hubTopic] = notificationService;
        }
    }
    string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
    lock {
        newsReceiversCache[newsReceiverId] = subscription;
    }
    check startNotificationReceiver(subscription);
}

在本文中,我们讨论了如何将消息代理集成到我们的 hub 实现中。本文将结束我一直在写的 WebSub 系列文章。祝你编码愉快….!

示例代码: https: //github.com/ayeshLK/weather-reporter/tree/state-3

作者:Ayesh Almeida

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/30946.html

(0)

相关推荐

发表回复

登录后才能评论