关于c#:Observable.Repeat 势不可挡,是bug还是特性?
The Observable.Repeat is unstoppable, is it a bug or a feature?
当源 observable 的通知是同步的时,我注意到
|
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o => { await Task.CompletedTask; //await Task.Yield(); Thread.Sleep(100); |
然后我将运算符
|
incremental.Repeat()
.Do(new CustomObserver("Checkpoint A")) .TakeWhile(item => item <= 5) .Do(new CustomObserver("Checkpoint B")) .LastAsync() .Do(new CustomObserver("Checkpoint C")) .Wait(); Console.WriteLine($"Done"); class CustomObserver : IObserver<int> |
这是这个程序的输出:
|
Checkpoint A: 1
Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Checkpoint A: 7 Checkpoint A: 8 Checkpoint A: 9 Checkpoint A: 10 Checkpoint A: 11 Checkpoint A: 12 Checkpoint A: 13 Checkpoint A: 14 Checkpoint A: 15 Checkpoint A: 16 Checkpoint A: 17 ... |
它永远不会结束!虽然
只有当源 observable 同步通知其订阅者时才会发生这种情况。例如,取消注释行
|
Checkpoint A: 1
Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Done |
有没有什么方法可以使
.NET Core 3.0、C# 8、System.Reactive 4.3.2、控制台应用程序
相关讨论
- 我一直说
Observable.Create 很糟糕...... -
好吧,它更多地与
Scheduler.Immediate 有关。将调度程序更改为incremental.ObserveOn(Scheduler.Default).Repeat() 并查看会发生什么。 - 我怀疑你锁定了取消订阅所需的线程。
-
@Enigmativity 它与
Observable.Create 无关。此实现也存在问题:var incremental = Observable.Defer(() => Observable.Return(++incrementalValue)); -
你是对的。然而
Observable.Create 使得创建具有此问题的可观察对象变得太容易了。 -
@Enigmativity with
ObserveOn(Scheduler.Default) 通知是从各种线程接收的,本质上是模仿我在生产者内部使用await Task.Yield() 的测试。但是我检查了另一个选项:ObserveOn(Scheduler.CurrentThread) 似乎可以彻底解决问题!使用此选项,程序按预期工作,并且一切都发生在一个线程中。ObserveOn(Scheduler.CurrentThread) 有什么我应该注意的缺点吗? -
在
Repeat 之前用SubscribeOn(Scheduler.CurrentThread) 也解决了这个问题,天知道为什么! -
从内存
Scheduler.CurrentThread 使用蹦床在当前线程(即当前上下文)上进行调度,因此它不会立即运行并取消阻塞线程。否则,使用Scheduler.Immediate 您将面临死锁的风险。 - @Enigmativity 是我的猜测,但是控制台应用程序没有同步上下文,因此令人费解。
-
您应该查看
System.Reactive.Concurrency 命名空间的源代码。那会让你的眼睛流泪。他们在那里做了很多技巧来使 Rx 工作。那里有一个同步上下文。
您可能期望
|
public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
RepeatInfinite(source).Concat(); private static IEnumerable< T > RepeatInfinite< T >(T value) |
将责任转移到
提供了不同的执行上下文,否则它仍然会继续旋转
|
public static IObservable< T > ConcatEx< T >(this IEnumerable<IObservable< T >> enumerable) =>
Observable.Create< T >(observer => { var check = new BooleanDisposable(); IDisposable loopRec(IScheduler inner, IEnumerator<IObservable< T >> enumerator) if (enumerator.MoveNext()) //this never returns false Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever |
作为一个无限流,
当
如果你有一个带有
永久蹦床。
带注释的调用栈
|
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted(); [External Code] ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...) [External Code] ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec) [External Code] Main.AnonymousMethod__0(o) //o.OnCompleted(); [External Code] ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...) [External Code] ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...) [External Code] Main(args) //incremental.RepeatEx()... |
相关讨论
- 谢谢阿斯蒂。我不能指望更彻底的答案!
- @TheodorZoulias 欢迎您!我利用挖掘源代码的经验来解决另一个问题。
- 抱歉,我之前没有尝试过!在我看来,评论中得出了一些结论。
-
是的,我们找到了使用
Scheduler.CurrentThread 的解决方法,但是导致默认行为的原因仍然是个谜。 ?? -
啊,在
ConcatEx 实现中将 `Scheduler.Immediate` 更改为Scheduler.CurrentThread 解决了这个问题。 -
但是在
Scheduler.CurrentThread 上运行Repeat 可能会导致许多其他问题。我的意思是一般。 -
是的,关于
ToObservable().ToEnumerable().ToObservable() 问题的另一个问题非常有启发性。如果你推得太多,RX 抽象就会开始泄漏! - 同意。大多数时候 Rx 是"不要担心并发",但是当你把任何东西推到极限时,泄漏抽象的法则就会迎头赶上。
- 如果这个谜团解开了,你能把它标记为已回答吗?