结合subscribe(on:options:)操作符

我有一个关于subscribe(on:options:)操作符的问题。如果有人能帮我弄清楚,我将不胜感激。

所以我们从文档中得到了什么:

指定在其上执行订阅、取消和请求操作的调度程序。与影响下游消息的 receive(on:options:) 相比,subscribe(on:options:) 改变了上游消息的执行上下文。

另外,我从不同的文章中得到的是,除非我们明确指定Scheduler在 (using receive(on:options:))上接收我们的下游消息,否则消息将在Scheduler用于接收订阅的上发送。

这些信息与我在执行过程中实际获得的信息不一致。

我有下一个代码:

Just("Some text")
    .map { _ in
        print("Map: (Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: (Thread.isMainThread)")
    }
    .store(in: &subscriptions)

我希望下一个输出:

Map: false
Sink: false
Map: false
Sink: false
Map: false
Sink: false

但相反,我得到:

Map: true
Sink: false

当我使用Sequence发布者时也会发生同样的事情。

如果我交换map操作员和subscribe操作员的位置,我会得到我想要的:

Just("Some text")
    .subscribe(on: DispatchQueue.global())
    .map { _ in
        print("Map: (Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: (Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

有趣的事实是,当我使用与自定义发布商的第一个列表相同的运算符顺序时,我会收到我想要的行为:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: (Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: (Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

所以我认为要么是我对所有这些机制的完全误解,要么是一些发布者故意选择发布值的线程(Just, Sequence-> Main, URLSession.DataTaskPublisher-> Some of Background),这对我来说没有意义,因为在这种情况下我们为什么需要这个subscribe(on:options:)为了。

你能帮我理解我错过了什么吗?先感谢您。

回答

要了解的第一件事是,信息双向流动起来管道和向下管道。沿着管道(“上游”)向上流动的消息是:

  • 订阅的实际表现(接收订阅)

  • 订阅者向上游发布者请求新值

  • 取消消息(这些消息从最终订阅者向上渗透)

沿着管道(“下游”)流动的消息是:

  • 价值观

  • 完成,包括失败(错误)或正常完成(报告发布者发出其最后一个值)

好吧,正如文档明确指出的那样,subscribe(on:)是关于前者:流向上游的消息。但你实际上并没有跟踪任何那些信息在你的测试,所以没有你的结果反映任何有关他们的信息!handleEvents在订阅点上方插入适当的运算符以查看管道向上流动的内容(例如实现其receiveRequest:参数):

Just("Some text")
    .handleEvents(receiveRequest: {
        _ in print("Handle1: (Thread.isMainThread)")
    })
    .map // etc.

同时,你应该没有任何关于哪些消息会流线的假设下游(即价值观和完成)。你说:

此外,我从不同的文章中得到的是,除非我们明确指定调度程序以在(使用receive(on:options:))上接收我们的下游消息,否则消息将在用于接收订阅的调度程序上发送。

但这似乎是一个虚假的假设。您的代码没有任何内容以明确的方式确定下游发送线程。正如您所说的那样,您可以使用来控制这一点receive(on:),但如果您不这样做,我会说您必须对此事不承担任何责任。一些发布者肯定会在后台线程上产生一个值,例如数据任务发布者,这是完全合理的(同样的事情发生在数据任务完成处理程序上)。其他人没有。

可以假设的是,除操作符以外的操作符receive(on:)通常不会改变值传递线程。但是,运营商是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的。要控制接收线程,请控制它!打电话receive(on:)或假设什么都不做。

举个例子,如果你把你的开场改为

Just("Some text")
    .receive(on: DispatchQueue.main)

那么你map和你sink都会报告他们正在主线程上接收值。为什么?因为你控制了接收线程。无论您在任何subscribe(on:)命令中说什么,这都有效。它们是完全不同的事情。

也许如果你调用subscribe(on:)但你不调用receive(on:),关于下游发送线程的一些事情是由subscribe(on:)线程决定的,但我肯定不会依赖任何关于它的硬性规则;文档中没有什么说的!相反,不要这样做。如果您实施subscribe(on:)receive(on:)也实施,以便负责发生的事情。

  • I hope my rewrite is useful. I'd like to compliment you on a well-asked question: very nice examples. You did your homework before formulating the question.

以上是结合subscribe(on:options:)操作符的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>