在 Laravel 中使用 Kafka 实现服务器之间的实时通信

最近,我在两个应用服务器之间的通信中遇到了一个重大挑战。我的想法是将队列作业从一台服务器(服务器一)分派到另一台处理复杂查询和计算的服务器(服务器二)。在完成所有数据库事务和计算后,服务器二应通知服务器一所有操作已成功完成。

起初,我考虑过 RabbitMQ。但是,我发现它并不适合我的使用案例,因此我选择了 Kafka。

Kafka 服务器

我决定使用 Upstash 的 Kafka,而不是部署服务器、安装和配置 Kafka。这是一个无服务器数据平台,具有低延迟的 Kafka 连接。有趣的是,他们采用的是 “即用即付 “模式,因此我不必一直使用专用服务器。

操作非常简单:

  • 在 Upstash 上创建一个账户
  • 打开 “Kafka “选项卡并创建一个集群(稍后可以创建主题)
  • 将端点、端口、用户名和密码复制到一个安全的地方
  • 点击 “主题”,创建一个主题(你可以把它们看作广播信息的通道)

我假设你已经有两个 Laravel 应用程序要进行测试。

配置两个应用程序

在两个应用程序上配置 .env 文件和 config/kafka.php 文件。

将这些内容添加到 .env 文件中:

KAFKA_URL=https://***************rest-kafka.upstash.io
KAFKA_USERNAME=ZXhjaXR**********************izHMAz_W6wmwJ8
KAFKA_PASSWORD=OWI1ODcz*****mZTY1N2VhMTNm
KAFKA_GROUP=default_group
KAFKA_INSTANCE=default_instance

KAFKA_URL 应不含端口号

创建文件 config/kafka.php,复制并粘贴以下代码:

<?php

return [
    'url' => env('KAFKA_URL', ''),
    'user' => env('KAFKA_USERNAME', ''),
    'password' => env('KAFKA_PASSWORD', ''),

    'group' => env('KAFKA_GROUP', 'default_group'),
    'instance' => env('KAFKA_INSTANCE', 'default_instance'),
];

生成有关某个主题的消息

在通过主题生成消息之前,在 app/services/Kafka/KafkaService.php  文件中创建一个 Laravel 服务容器 

<?php

namespace App\Services\Kafka;

use Illuminate\Support\Facades\Http;

class KafkaService
{
    private $http;
    private $url;
    private $user;
    private $password;

    public function __construct(Http $http)
    {
        $this->http = $http;
        $this->url = config('kafka.url');
        $this->user = config('kafka.user');
        $this->password = config('kafka.password');
    }

    public function produce($topic, $producer, $data, $headers = [])
    {
        $url = $this->getUrl($topic);
        $defaultHeaders = $this->getHeaders();

        $headers = array_merge($defaultHeaders, $headers);

        $response = $this->http::withHeaders($headers)->post($url, [
            'key' => $producer,
            'value' => json_encode($data),
        ]);

        return $response->json();
    }

    private function getUrl($topic)
    {
        return sprintf('%s/produce/%s', $this->url, $topic);
    }

    private function getHeaders()
    {
        $auth = base64_encode($this->user . ':' . $this->password);

        return [
            'Authorization' => 'Basic ' . $auth,
            'Content-Type' => 'application/json',
        ];
    }
}

produce()方法有四个参数

  1. $topic是我们在 Upstash UI 上创建的主题名称
  2. $producer是一个类,用于识别哪个类负责生成消息,我们将在稍后创建消费者时看到它的作用
  3. $data包含需要生成的有效负载消息
  4. 可选$headers在生成消息时发送。

其余方法是不言自明的。

现在,您可以在任何控制器的构造函数中注入此服务容器,并调用 Produce 方法。示例:

<?php

namespace App\Http\Controllers;

use App\Services\Kafka\KafkaService;

class YourController extends Controller
{
    private $kafkaService;

    public function __construct(KafkaService $kafkaService)
    {
        $this->kafkaService = $kafkaService;
    }

    public function yourMethod()
    {
        $topic = 'your-topic';
        $producer = self::class;
        $data = ['key' => 'value']; // your data

        $response = $this->kafkaService->produce($topic, $producer, $data);

        // handle the response
    }
}

响应成功后,点击主题名称,然后点击消息选项卡,就能在 Upstash UI 上看到生成的消息。

在 Laravel 中使用 Kafka 实现服务器之间的实时通信
在 Laravel 中使用 Kafka 实现服务器之间的实时通信

消费消息

现在如何在另一个 Laravel 应用程序中消费该消息?由于 PHP 不允许我们持续订阅进程,我们需要一个变通方法来持续消费一个主题。

在之前创建的 KafkaService.php 中添加以下方法:

public function consume($group, $instance, $topic, $headers = [])
{
    $url = sprintf('%s/consume/%s/%s', $this->url, $group, $instance);
    $defaultHeaders = $this->getHeaders();

    $headers = array_merge($defaultHeaders, $headers);

    $response = $this->http::withHeaders($headers)->post($url, [
        'topic' => $topic,
        'timeout' => 10000,
    ]);
    return $response->json();
}

下面是代码的详细说明:

  • 它使用提供的 $group$instance 参数以及存储在 $this->url 中的基本 URL 构建一个 URL。URL 格式为 {$this->url}/consume/{$group}/{$instance}。
  • 它通过调用 getHeaders 方法获取 HTTP 请求的默认头信息。
  • 它会将默认头信息与作为参数提供的头信息合并。这样就能确保传递给 consume 方法的任何其他头信息都会包含在 HTTP 请求中。
  • 它使用 http 类向构建的 URL 发出 HTTP POST 请求。请求包括 $topic 和 10000 的timeout 值 (代表 10 秒超时)。
  • 它会以 JSON 格式返回 HTTP 请求的响应。

通过以下方式创建一个 Laravel 命令:

php artisan make:command KafkaConsume

现在打开创建的文件 app/Console/Commands/KafkaConsume.php,复制并粘贴以下代码:

<?php

namespace App\Console\Commands;

use App\Services\Kafka\KafkaService;
use App\Services\Kafka\ProcessKafkaEvent;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Http;

class KafkaConsume extends Command
{
    protected $signature = 'kafka:consume {--topic=default}';
    protected $description = 'Consume Messages from Kafka by Upstash';

    private $kafkaService;

    public function __construct(KafkaService $kafkaService)
    {
        parent::__construct();

        $this->kafkaService = $kafkaService;
    }

    public function handle()
    {
        $topic = $this->option('topic');
        $group = config('kafka.group');
        $instance = config('kafka.instance');
        $headers = [
            'Kafka-Auto-Offset-Reset' => 'latest',
            'Kafka-Auto-Commit-Interval' => 5000,
        ];

        $messages = $this->kafkaService->consume($group, $instance, $topic, $headers);

        foreach ($messages as $message) {
            $producer = $message['key'];
            $payload = collect(json_decode($message['value']));
            ProcessKafkaEvent::handle($topic, $producer, $payload);
        }
    }
}

在方法开始时,它会使用 $this->option(‘topic’) 从命令选项中获取主题。如果运行命令时没有提供主题,它将使用命令签名中指定的默认值。

接下来,它会使用 config 函数从应用程序的配置中获取 Kafka 组和实例。组和实例用于识别 Kafka 集群中的消费者。

然后,它会定义一个包含两个 Kafka 特定头的 $headers 数组: Kafka-Auto-Offset-Reset 和 Kafka-Auto-Commit-Interval。Kafka-Auto-Offset-Reset 头被设置为 latest,这意味着如果没有提供初始偏移量,消费者将从主题上最新的消息开始消费消息。Kafka-Auto-Commit-Interval 标头设置为 5000,这意味着消费者将每隔 5000 毫秒自动提交其消费的最后一条消息的偏移量。

然后,KafkaService 的消耗方法(consume method)就会被调用,其中包括组、实例、主题和标题。该方法负责向 Kafka 服务器发出 HTTP 请求,以消耗消息。消耗的消息以数组形式返回,并存储在 $messages 变量中。

然后,该方法会循环遍历 $messages 数组中的每条消息。对于每条消息,它都会检索键(代表消息的生产者)和值(代表消息的有效载荷)。使用 collect 函数将有效载荷从 JSON 格式解码并转换为 Laravel 集合。

最后,ProcessKafkaEvent 类的 handle 方法会被调用,其中包括 topic、producer 和 payload。该方法负责处理消耗的 Kafka 事件。

创建另一个文件app/Services/Kafka/ProcessKafkaEvent.php

<?php

namespace App\Services\Kafka;

use App\Services\Report\ReferralReport;

class ProcessKafkaEvent
{
    public static function handle($topic, $producer, $payload)
    {
        $target = match ($producer . ':' . $topic) {
            'App\\Http\\Controllers\\YourController:test' => SamplHandler::class,
            default => UnhandledKafkaEvent::class
        };

        $instance = new $target;
        $instance->handle($payload, $producer, $topic);
    }
}
  1. handle 方法是一个静态方法,它采用三个参数:$topic$producer 和 $payload。它们分别代表 Kafka 主题、消息生产者和消息负载。
  2. match 表达式用于确定应处理 Kafka 事件的目标类。它通过使用冒号 () 连接 $producer 和 $topic 并将结果与​​可能值列表进行匹配来实现此目的。在这种情况下,如果生产者和主题匹配 'App\\Http\\Controllers\\YourController:test',它将使用 SampleController 类。如果没有匹配,则默认为 UnhandledKafkaEvent类。
  3. 然后创建目标类的一个新实例,并调用该实例中的 handle 方法,同时传递 $payload$producer 和 $topic

请记住,我们在生成消息时添加了一个生产者类 key,这里我们过滤 producer 类以获得正确的类处理消息。ProcessKafkaEvent 类充当路由器。您可以在 match 中添加任意数量的类。

创建一个新文件 app/Services/Kafka/UnhandleKafkaEvent.php,如果没有为特定生产者定义处理程序类,该文件将充当后备处理程序:

<?php

namespace App\Services\Kafka;

use Illuminate\Support\Facades\Log;

class UnhandledKafkaEvent
{
    public function handle($payload, $producer, $topic)
    {
        // log the unhandled messages here, customize it according to your need
        Log::error('Unhandled Kafka queued job: ' . $producer, [
            'level' => ActivityLevel::emergency->value,
            'job' => $producer,
            'queue' => $topic,
            'connection' => 'kafka',
            'job_arguments_payload' => $payload->toArray(),
        ]);
    }
}

在 SampleHandler 类中,应该有一个 handle() 方法,可以执行以下操作:

public function handle($payload, $producer, $topic)
{
    // process the payload here
}

现在运行我们在这里创建的 artisan 命令会发生什么?如果没有信息要处理,它会在 10 秒后关闭,或者在处理完信息后立即关闭。但这并不是我们想要的。我们希望命令始终运行或消耗信息。为此,我们需要一个 Worker。

设置 Worker

在 Ubuntu 上安装 supervisor。安装后创建并编辑文件 – sudo nano /etc/supervisor/conf.d/consumer-worker.conf。复制并粘贴以下内容:

[program:consumer-worker]
directory=/var/www/html ; laravel root directory
process_name=%(program_name)s_%(process_num)02d
command=php artisan kafka:consume --topic=test ; whatever your topic name is
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor-consumer-worker.log
stopwaitsecs=1800

现在保存并关闭文件。

运行:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start consumer-worker:*

现在,它将从第一个 Laravel 应用程序中持续运行并消耗信息。

所有这些都是一次性设置,你现在可以创建任意数量的生产者,并消费它们。

如果…

1. 从第一个应用程序生成信息后,如果想在第二个应用程序上生成一些 Excel 报告,该怎么办?就是这样

// first application
// ReportsController.php
$response = $this->kafkaService->produce('reports', self::class, [
    'from' => now()->subMonth(),
    'to' => now()
]);


// second application
// ProcessKafkaEvent.php
$target = match ($producer . ':' . $topic) {
    'App\\Http\\Controllers\\YourController:test' => SamplHandler::class,
    'App\\Http\\Controllers\\ReportsController:reports' => HandleReport::class,
    default => UnhandledKafkaEvent::class
};

// HandleReport.php
public function handle($payload, $producer, $topic)
{
    $from = $payload['from'];
    $to = $payload['to'];
    // generate the actual report
}

2. 如果想进一步通知第一台服务器报告已生成怎么办?可以遵循相同的生产者和消费者步骤,但现在相反,配置第二个应用程序以生成消息,并配置第一个应用程序以使用消息。

3. 如果想通知前端应用程序报告已生成怎么办?好吧,在这种情况下,我们可以使用网络套接字。

作者:Subham Chakraborty

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/39674.html

(0)

相关推荐

发表回复

登录后才能评论