多个长时间运行的并行任务
c#
尝试创建一个服务来消费来自 Kafka 的许多主题。我想为每个消费者创建一个任务,这意味着这些是长时间运行的任务。
在每项任务中,如果我这样做;
while(!token.IsCancellationRequested)
{
var result = await consumer.Consume(token);
}
然后,如果我尝试启动多个任务,第一个任务会启动,但其他任务不会继续。
我以为我正在寻找关于如何正确管理任务的最佳实践(这不是我做过的很多事情)并发布在软件工程上,但有人告诉我有一个错误,我应该在这里发布。
看这部作品;
async Task Main()
{
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await Task.WhenAll(Consumer1(cancellationTokenSource.Token), Consumer2(cancellationTokenSource.Token));
Console.WriteLine("Complete");
}
private async Task Consumer1(CancellationToken token)
{
Console.WriteLine("Working on consumer 1");
while (!token.IsCancellationRequested)
{
// Do a thing
// await consumer.Consume()
}
Console.WriteLine("Completed Consumer 1");
await Task.CompletedTask;
}
private async Task Consumer2(CancellationToken token)
{
Console.WriteLine("Completed consumer 2");
await Task.CompletedTask;
}
结果将是;
00:00:00: Working on consumer 1
00:00:05: Completed Consumer 1
00:00:05: Completed consumer 2
00:00:05: Complete
如果你在 Consumer1 中放置一个 await Task.Delay(1),在 while 之上,你会得到;
00:00:00: Working on consumer 1
00:00:00: Completed consumer 2
00:00:05: Completed Consumer 1
00:00:05: Complete
实际代码确实包含await consumer.Consume(token)但不足以并行运行任务,并且行为告诉我我在某处缺少正确的用法。所以,我的第一个问题是,我在这里是否完全错误地使用了任务,即是否有最佳实践来实现我想要做的事情。其次,我在代码中缺少的错误是什么?
作为@Jakoss 评论的参考,这将按预期运行任务;
var tasks = new List<Task>();
tasks.Add(Task.Run(async () => await Consumer1(cancellationTokenSource.Token)));
tasks.Add(Task.Run(async () => await Consumer2(cancellationTokenSource.Token)));
await Task.WhenAll(tasks.ToArray());
这是正确的方法吗?
回答
只有当我放入 Task.Delay(1) 时,它才会启动第二个任务。我希望两个消费者都在运行,我获得这种行为的唯一方法是使用 Task.Delay,这让我觉得我做的事情很糟糕或误解了这一切是如何运作的。
所有方法(包括async方法)开始同步执行。此外,如果 anawait用于已经完成的可等待对象,它将继续同步执行。在示例代码中,由于await consumer.Consume()注释掉了,整个方法同步运行。
如果您确定它consumer.Consume会异步运行,那么您所要做的就是取消注释它。但是,如果它可能不是异步行为(例如,如果消息已经到达消费),那么您可能希望使用Task.Run在后台线程上启动该方法。