RakuCro服务在“后台”订阅数据的一般指南

我正在尝试组合一个具有反应/whenever 块在“后台”消费数据的 Cro 服务。因此,与许多使用 Cro 的 websocket 使用示例不同,这与可以通过浏览器访问的路由无关。

我的用例是使用通过 MQTT 主题接收的消息并对其进行一些处理。在开发的后期阶段,我可能会从这些数据中创建一个供应,但是现在,当接收到数据时,它将存储在一个变量中并取决于某些条件,通过 http post 发送到另一个服务。

我的想法是provider()Cro::HTTP::Server设置中包含一个,如下所示:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

在 DataProvider.pm6 中:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

这会引发一堆错误:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

老实说,我完全猜测这就是我将如何处理在 Cro 服务的后台订阅数据的需求,但我无法找到任何可能被视为推荐方法的信息。

最初我在主service.pm6文件中有我的 react/whenever 块,但这似乎不对。并且需要被包裹在一个start{}块中,因为正如我刚刚了解到的,react 正在阻塞 🙂 并且 cro 无法真正启动。

但是遵循 Routes 如何实现的模式似乎合乎逻辑,但我遗漏了一些东西。该错误涉及设置新方法,但我不相信这是根本原因。Routes.pm6没有构造函数。

任何人都可以指出我正确的方向吗?

回答

感谢所有提供信息的人,这是一次非常有价值的学习练习。

传递附加子程序的方法,router()application参数旁边Cro::HTTP::Server.new给进一步的麻烦。(不允许使用数组,并且破坏了路由)

相反,我已经搬到后台工作纳入一类是自己的,并给了它一个startstop方法更接近于Cro::HTTP::Server

我的新方法:

服务.pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here 
use Database;

my $dsn         = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh         = Database.new :$dsn;

my $mqtt-host   = 'localhost';
my $subscriber  = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
    http => <1.1>,
    host => ...,
    port => ...,
    application => routes($dbh), # Basically back the way it was originally 
    after => [
        Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
    ]
);

$http.start;
say "Listening at...";
react {
    whenever signal(SIGINT) {
        say "Shutting down...";
        $subscriber.stop;
        $http.stop;
        done;
    }
}

在 KlineDataSubscriber.pm6 中

use MQTT::Client;

class KlineDataSubscriber {
    has Str $.mqtt-host is required;
    has MQTT::Client $.mqtt = Nil;

    submethod TWEAK() {
        $!mqtt = MQTT::Client.new: server => $!mqtt-host;
        await $!mqtt.connect;
    }

    method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
        start {
            react {
                whenever $!mqtt.subscribe($topic) {
                    say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
                }
            }
        }
    }

    method stop() {
        # TODO Figure how to unsubscribe and cleanup nicely
    }
}

这对我来说更像是“Cro 惯用语”,但我很乐意得到纠正。更重要的是,它按预期工作,我觉得有点未来证明。我应该能够创建一个供应,使路由器可以使用实时数据,并将数据推送到任何连接的 Web 客户端。

我还打算拥有一个/status带有各种检查的 http GET 端点,以确保一切正常


回答

根本原因

该错误涉及设置new方法,但我不相信这是根本原因。

这不是关于建立一种新方法。这是关于一个应该定义而不是未定义的值。这通常意味着尝试初始化它失败,这通常意味着调用 .new.

任何人都可以指出我正确的方向吗?

希望这个问题有帮助。

查找有关推荐方法的信息

我完全猜测这就是我将如何处理在 Cro 服务的后台订阅数据的需求,但我无法找到任何可能被视为推荐方法的信息。

列出您从Cro 入门中遵循的快速入门步骤可能对您有所帮助,包括基础知识以及最后的“了解”步骤。

错误信息

A react block:
  in sub provider ...

Died because of the exception:
    ... 
      in method subscribe ...

错误消息以内置react构造开始,报告它捕获了一个异常(并通过抛出自己的异常作为响应来处理它)。提供了与代码中出现的位置相对应的“回溯”,react从最初的“A react 块:”缩进。

错误消息继续react构造总结其自己的异常 ( Died because ...) 并通过在后续行中报告原始异常(进一步缩进)来解释自身。这包括另一个回溯,这次一个对应于原始异常,这很可能发生在具有不同调用堆栈的不同线程上。

(所有 Raku 的结构化多线程构造[1] 都使用这两个部分的错误报告方法来处理它们通过抛出另一个异常来捕获和处理的异常。)


第一个回溯表示该react行:

in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {

第二个回溯是关于原始异常的:

    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
      in method subscribe at ... (MQTT::Client) line 133

该报道称,write所谓的第133行的方法MQTT::Client要求其调用者是一个实例类型“IO ::插座::异步”的。它得到的值该类型的但不是一个实例,而是一个“类型的对象”。(非天然类型的所有值均为任一类型的对象他们的类型的实例。)。

错误消息的结尾是:

  Did you forget a '.new'?

这是一个简洁的提示,基于这样一个现实,即在需要实例时遇到类型对象的原因有 99 次是代码未能初始化变量。(类型对象的用途之一是在 Perl 等语言中充当“未定义”的角色。)

那么,您能明白为什么应该是 'IO::Socket::Async' 的初始化实例的东西是未初始化的实例吗?

脚注

[1] Raku 的并行、并发和异步构造遵循结构化编程范式。请参阅Raku 中的并行性、并发性和异步性,了解 Jonathan Worthington 对这种整体方法的视频演示。结构化构造 likereact可以干净地观察、包含和管理在其执行范围内发生的任何事件,包括错误异常等错误,即使它们发生在其他线程上。


以上是RakuCro服务在“后台”订阅数据的一般指南的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>