以最大并发运行多个操作-不执行最后2个任务
c#
我创建了一个类,它允许我同时运行多个操作以及设置最大并发限制的选项。即,如果我有 100 个操作要做,并且我设置maxCurrency为 10,那么在任何给定时间,最多应该同时运行 10 个操作。最终,所有的操作都应该被执行。
这是代码:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
Debug.WriteLine($"Calculated results: {results.Count}");
return results.ToList().AsReadOnly();
}
这是我如何使用它的示例:
var operations = Enumerable.Range(1, 10)
.Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
{
await Task.Delay(100, ct);
return n;
});
var data = await _sut.Run(operations, 2, CancellationToken.None);
每次执行此操作时,data集合只有 8 个结果。我希望有 10 个结果。
这是调试日志:
Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8
如你看到的:
- 完成10个任务
- “添加新”被记录 10 次
- “添加的 x”被记录了 8 次
我不明白为什么最后两个操作没有完成。所有任务都IsComplete设置为true,据我所知,这应该意味着所有任务都被执行到最后。
回答
这里的问题是Task.Factory.StartNew返回一个任务,当等待时返回内部任务。
它不会给你一个等待这个内部任务的任务,因此你的问题。
解决此问题的最简单方法是调用Unwrap您创建的任务,这将解开内部任务并允许您等待。
这应该有效:
var task = ....
....
}, ct).Unwrap();
通过这个小小的改变,你会得到这个输出:
...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10
请注意,我对您的问题的评论仍然有效:
- 您仍然在幻想 WhenAll 将等待所有任务,而实际上除了最后 N 个任务之外的所有任务都已经完成,因为循环本身在前一个任务完成之前不会继续。因此,您应该将同步对象获取移动到您的内部任务中,以便您可以在开始等待它们之前将它们全部排队。
我也相信(虽然我不是 100%知道)使用 SemaphoreSlim 不是一个好方法,因为我相信任何与线程相关的同步对象在与任务相关的工作中使用可能不安全。线程池中的线程被重用,而实时任务正在等待子任务完成,这意味着这样的线程可能已经拥有尚未完成的前一个任务的同步对象,因此允许多于您想要运行的 2 个在“同时”。SemaphoreSlim 可以使用,其他同步原语可能不是。