如何使用 actix-web 在 websocket 处理程序中启动守护进程?

给定一个带有 Actix的WebSocket 服务器的基本设置,我如何在我的消息处理程序中启动一个守护进程?

我已经扩展了上面链接的示例启动代码以daemon(false, true)使用fork crate进行调用。

use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use fork::{daemon, Fork};

/// Define HTTP actor
struct MyWs;

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(
        &mut self,
        msg: Result<ws::Message, ws::ProtocolError>,
        ctx: &mut Self::Context,
    ) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => {
                println!("text message received");

                if let Ok(Fork::Child) = daemon(false, true) {
                    println!("from daemon: this print but then the websocket crashes!");
                };

                ctx.text(text)
            },
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    let resp = ws::start(MyWs {}, &req, stream);
    println!("{:?}", resp);
    resp
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

上面的代码启动了服务器,但是当我向它发送消息时,我收到了一个Panic in Arbiter thread.

text message received
from daemon: this print but then the websocket crashes!                                               
thread 'actix-rt:worker:0' panicked at 'failed to park', /Users/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/runtime/basic_scheduler.rs:158:56
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.

回答

您的应用程序的问题在于 actix-web 运行时(即 Tokio)是多线程的。这是一个问题,因为fork()调用(由 内部使用daemon())仅复制调用fork().

即使你的父进程有 N 个线程,你的子进程也只有 1 个。如果你的父进程有任何被这些线程锁定的互斥锁,它们的状态将在子进程中复制,但由于这些线程不存在那里,它们将永远保持锁定状态。

如果你有一个Rc/Arc它永远不会取消分配它的内存,因为它永远不会被删除,因此它的内部计数永远不会达到零。这同样适用于任何指针和共享状态。

或者更简单地说 -你分叉的孩子最终会处于未定义状态。

这在Calling fork() in a Multithreaded Environment 中得到了最好的解释:

由于这些类型的问题,通常是线程修改持久状态的问题,POSIX 定义了 fork() 在线程存在的情况下的行为,以仅传播分叉线程。这解决了对持久状态进行不当更改的问题。但是,它会导致其他问题,如下一段所述。

在 POSIX 模型中,仅传播分叉线程。所有其他线程都被淘汰,恕不另行通知;不发送取消,也不运行处理程序。然而,地址空间的所有其他部分都被克隆,包括所有互斥状态。如果其他线程锁定了互斥锁,则该互斥锁将在子进程中被锁定,但锁所有者将不存在来解锁它。因此,被锁保护的资源将永久不可用。

在这里,您可以找到更可靠的来源,并提供更多详细信息

回答你的另一个问题:

我假设您想在 accept()模型实现经典的 unix “fork()”模型。在这种情况下,你就不走运了,因为诸如 actix-web 和 async/await 之类的服务器一般都没有考虑到这一点。即使你有一个单线程的 async/await 服务器,那么:

嗯……在我熟悉的任何基于 async/await 的服务器上,都无法阻止任何这种情况。您需要一个自定义服务器:

  1. 检查它的接受器任务是否是一个孩子,如果它检测到它是孩子,它应该关闭侦听套接字并丢弃接受器。
  2. 它不应该执行从父级分叉的任何其他任务,但没有办法实现这一点。

换句话说 - async/await 和“fork() on accept()”是用于并发处理任务的两种不同
且不兼容的模型。

一个可能的解决方案是拥有一个非异步接受器守护进程,它只接受连接和分叉本身。然后在孩子中产生一个网络服务器,然后将接受的套接字提供给它。但是尽管可能,但目前没有任何服务器支持。


以上是如何使用 actix-web 在 websocket 处理程序中启动守护进程?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>