使用 Ballerina 构建 WebSub hub(WebSub入门系列二)

在之前的文章中我对WebSub协议做了简单的介绍。在本文中,我们将使用Ballerina Websubhub标准库实现一个 WebSub hub。

我们要实现的用例是一个天气通知 hub。新闻台可以订阅该 hub,以接收特定地点的每小时天气更新。在本例中,我们将主要关注 hub 与订阅者之间的关系。

实施过程中将涉及以下方面:

  1. 使用 Ballerian 语言实现基本 WebSub hub。
  2. 使用 Ballerina 语言实现 WebSub订阅者
  3. 集成第 3 方 API 以检索某个位置的天气报告。
  4. 连接 Apache Kafka 消息代理作为 WebSub hub实现的持久层。

在本文中,我们将介绍前两点,其余部分将在接下来的博客文章中介绍。

WebSub Hub实施

Ballerina Websubhub 标准库提供了一个精简 API 层来实现符合 WebSub 的 hubs如果我们仔细阅读Websubhub标准库的API规范,我们可以很容易地理解需要实现的API。

在 Ballerina 语言中,WebSub hub 以监听器和服务的形式设计

  • websubhub:Listenerwebsubhub:Service:可以附加的侦听器端点。
  • websubhub:Service:API 服务,接收 WebSub 事件。

websubhub:Listener充当 HTTP 侦听器的包装器,使订阅者发布者可以向其发送请求的 HTTP 端点。每次向端点发出请求时,都会触发附加到 的websubhub:Listener相应 API 。websubhub:Servicewebsubhub:Listener

以下是基本 hub 实现的示例:

import ballerina/websubhub;

service /hub on new websubhub:Listener(9000) {

    remote function onRegisterTopic(websubhub:TopicRegistration msg)
        returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError {
        // todo: implement logic here
    }

    remote function onDeregisterTopic(websubhub:TopicDeregistration msg) 
        returns websubhub:TopicDeregistrationSuccess|websubhub:TopicDeregistrationError {
        // todo: implement logic here
    }

    // todo: implement other required remote methods
}

上面的代码将在端口上启动一个 HTTP 端点9000并将其附加websubhub:Service/hub服务路径。发布者订阅者可以使用URL 向中心发送请求http://localhost:9000/hub

在我们的 hub 实现示例中,订阅者订阅特定位置的天气警报。订阅者可在订阅请求中发送地点名称作为 hub.topic 参数。

hub 应保持两种状态。

  1. 应发出天气警报的地点。
  2. 订阅天气警报的新闻接收者。

可用位置可以作为 string[]进行管理,新闻接收器将作为 map<websubhub:VerifiedSubscription>进行管理。新闻接收者将被分配一个 ID,该 ID 将使用订阅请求中的hub.topichub.callback参数导出。

有了上述限制,我们的 hub 实现将如下所示:

import ballerina/http;
import ballerina/websubhub;

isolated string[] locations = [];
isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {};

service /hub on new websubhub:Listener(9000) {

    // Topic registration is not supported by this `hub`
    remote function onRegisterTopic(websubhub:TopicRegistration msg)
        returns websubhub:TopicRegistrationError {
        return error websubhub:TopicRegistrationError(
            "Topic registration not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
    }

    // Topic deregistration is not supported by this `hub`
    remote function onDeregisterTopic(websubhub:TopicDeregistration msg) returns websubhub:TopicDeregistrationError {
        return error websubhub:TopicDeregistrationError(
            "Topic deregistration not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
    }

    // Content update is not supported by this `hub`
    remote function onUpdateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError {
        return error websubhub:UpdateMessageError(
            "Content update not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
    }

    remote function onSubscriptionValidation(readonly & websubhub:Subscription subscription) returns websubhub:SubscriptionDeniedError? {
        string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
        boolean newsReceiverAvailable = false;
        lock {
            newsReceiverAvailable = newsReceiversCache.hasKey(newsReceiverId);
        }
        if newsReceiverAvailable {
            return error websubhub:SubscriptionDeniedError(
                    string `News receiver for location ${subscription.hubTopic} and endpoint ${subscription.hubCallback} already available`,
                    statusCode = http:STATUS_NOT_ACCEPTABLE
                );
        }
    }

    remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
        lock {
            if locations.indexOf(subscription.hubTopic) is () {
                locations.push(subscription.hubTopic);
            }
        }
        string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
        lock {
            newsReceiversCache[newsReceiverId] = subscription;
        }
    }

    remote function onUnsubscriptionValidation(readonly & websubhub:Unsubscription unsubscription) returns websubhub:UnsubscriptionDeniedError? {
        string newsReceiverId = string `${unsubscription.hubTopic}-${unsubscription.hubCallback}`;
        boolean newsReceiverNotAvailable = false;
        lock {
            newsReceiverNotAvailable = !newsReceiversCache.hasKey(newsReceiverId);
        }
        if newsReceiverNotAvailable {
            return error websubhub:UnsubscriptionDeniedError(
                    string `News receiver for location ${unsubscription.hubTopic} and endpoint ${unsubscription.hubCallback} not available`,
                    statusCode = http:STATUS_NOT_ACCEPTABLE
                );
        }
    }

    remote function onUnsubscriptionIntentVerified(readonly & websubhub:VerifiedUnsubscription unsubscription) returns error? {
        string newsReceiverId = string `${unsubscription.hubTopic}-${unsubscription.hubCallback}`;
        lock {
            _ = newsReceiversCache.removeIfHasKey(newsReceiverId);
        }
    }
}

请注意,在我们的 hub 实现中,以下远程方法已被禁用,因为它们与发布者相关的功能相关。

  • onRegisterTopic
  • onDeregisterTopic
  • onUpdateMessage

为了提高代码的可读性,我们将在实现中添加以下附加实用方法。

isolated function removeNewsReceiver(string newsReceiverId) {
    lock {
        _ = newsReceiversCache.removeIfHasKey(newsReceiverId);
    }
}

isolated function getNewsReceivers(string location) returns websubhub:VerifiedSubscription[] {
    lock {
        return newsReceiversCache
            .filter(newsReceiver => newsReceiver.hubTopic == location)
            .toArray().cloneReadOnly();
    }
}

hub 实现能够记录位置和已注册的新闻接收器。现在,我们要实现定期向注册新闻接收器发送天气警报的功能。在初始状态下,我们将有一组预定义的警报模板,这些模板将根据位置进行定制。

import ballerina/random;
import ballerina/mime;
import ballerina/lang.runtime;
import ballerina/regex;
import ballerina/time;
import ballerina/log;
import ballerina/websubhub;

// predifined alert templates
final readonly & string[] alerts = [
    "Severe weather alert for [LOCATION] until [TIME]. We will send updates as conditions develop. Please call this number 1919 for assistance or check local media.", 
    "TORNADO WATCH for [LOCATION] until [TIME]. Storm conditions have worsened, be prepared to move to a safe place. If you are outdoors, in a mobile home or in a vehicle, have a plan to seek shelter and protect yourself. Please call this number 1919 for assistance or check local media."
];

isolated function startSendingNotifications(string location) returns error? {
    map<websubhub:HubClient> newsDispatchClients = {};
    while true {
        log:printInfo("Running news-alert dispatcher for ", location = location);
        websubhub:VerifiedSubscription[] currentNewsReceivers = getNewsReceivers(location);
        final readonly & string[] currentNewsReceiverIds = currentNewsReceivers
            .'map(receiver => string `${receiver.hubTopic}-${receiver.hubCallback}`)
            .cloneReadOnly();

        // remove clients related to unsubscribed news-receivers
        string[] unsubscribedReceivers = newsDispatchClients.keys().filter(dispatcherId => currentNewsReceiverIds.indexOf(dispatcherId) is ());
        foreach string unsubscribedReceiver in unsubscribedReceivers {
            _ = newsDispatchClients.removeIfHasKey(unsubscribedReceiver);
        }

        // add clients related to newly subscribed news-receivers
        foreach var newsReceiver in currentNewsReceivers {
            string newsReceiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`;
            if !newsDispatchClients.hasKey(newsReceiverId) {
                newsDispatchClients[newsReceiverId] = check new (newsReceiver);
            }
        }
        
        if newsDispatchClients.length() == 0 {
            continue;
        }
        string alert = check retrieveAlert(location);
        foreach var [newsReceiverId, clientEp] in newsDispatchClients.entries() {
            websubhub:ContentDistributionSuccess|error response = clientEp->notifyContentDistribution({
                contentType: mime:APPLICATION_JSON,
                content: {
                    "weather-alert": alert
                }
            });
            if response is websubhub:SubscriptionDeletedError {
                removeNewsReceiver(newsReceiverId);
            }
        }
        // wait for 1 minute befaore starting next notificaion dispatch round
        runtime:sleep(60);
    }
}

isolated function retrieveAlert(string location) returns string|error {
    string alert = alerts[check random:createIntInRange(0, alerts.length())];
    alert = regex:replace(alert, "\\[LOCATION\\]", location);
    time:Utc alertExpiryTime = time:utcAddSeconds(time:utcNow(), 3600);
    alert = regex:replace(alert, "\\[TIME\\]", time:utcToString(alertExpiryTime));
    return alert;
}

在这里,我们使用 websubhub:HubClient 向新闻接收器传送内容。websubhub:HubClient 与 WebSub 订阅者(在本例中为新闻接收者)具有一对一的映射关系。

有了这些变化,我们就可以改进 hub 服务中的 onSubscriptionIntentVerified 远程方法。

remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
    // flag to identify whether a particular location mentioned in the subscription request is a new location
    boolean localtionUnavailble = false;
    lock {
        if locations.indexOf(subscription.hubTopic) is () {
            locations.push(subscription.hubTopic);
            localtionUnavailble = true;
        }
    }
    string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
    lock {
        newsReceiversCache[newsReceiverId] = subscription;
    }
    
    // if the location mentioned in the subscription request is a new location, start sending notification
    // `start startSendingNotifications(subscription.hubTopic)` will start a new worker to send notifications.
    // this newly created worker will execute in parallel to other workers 
    if localtionUnavailble {
        _ = start startSendingNotifications(subscription.hubTopic);
    }
}

在实现过程中,我使用了 Ballerina 语言中的几种并发结构。

WebSub 订阅者的实现

在我们的示例中,WebSub 订阅者代表一个新闻接收器。我们可以使用以下代码实现一个简单的订阅器。

import ballerina/websub;
import ballerina/log;

@websub:SubscriberServiceConfig { 
    target: ["http://localhost:9000/hub", "Colombo"]
} 
service /news\-receiver on new websub:Listener(9091) {
    remote function onEventNotification(websub:ContentDistributionMessage event) returns websub:Acknowledgement {
        log:printInfo("Recieved weather-alert ", alert = event);
        return websub:ACKNOWLEDGEMENT;
    }
}

这里需要注意的一个重要问题是,订阅者是另一个暴露 HTTP 端点的服务。

websub:SubscriberServiceConfig 注解中,我们需要提供目标参数(这是一个元组)。在目标参数中,第一个值代表 hub URL,第二个值代表订阅请求中应使用的 hub.topic 参数。

在本文中,我们讨论了如何使用 Ballerina 语言实现基本的 WebSub hub 和订阅者。在下一篇文章中,我们将介绍如何集成第 3 方 API 来检索天气通知。

示例代码: https: //github.com/ayeshLK/weather-reporter/tree/state-1

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/30670.html

(0)

相关推荐

发表回复

登录后才能评论