如何以比这更好的方式创建异步TCP服务器?
c#
我有一个服务器应用程序来接受来自一组大约 10000 个客户端(实际上是具有 GPRS 接口的现场设备)的 TCP 连接。
接受连接的部分是这样的:
public async System.Threading.Tasks.Task AcceptConnections()
{
_listenerProxy.StartListen();
var taskList = new List<System.Threading.Tasks.Task>();
var application = _configManager.Configuration.GetSection("application");
var maxTasks = int.Parse(application["MaxTasksPerListener"]);
while (true)
{
if (taskList.Count > maxTasks)
{
_logger.Info($"Waiting for handling task to be completed {_listenerProxy.Port}.");
await System.Threading.Tasks.Task.WhenAny(taskList.ToArray()).ConfigureAwait(false);
taskList.RemoveAll(t => t.IsCompleted); // at least one task affected, but maybe more
}
_logger.Info($"Waiting for client to be accepted on port {_listenerProxy.Port}.");
(var readStream, var writeStream, var sourceIP, var sourcePort) = await _listenerProxy.AcceptClient().ConfigureAwait(false);
_logger.Info($"Client accepted on port {_listenerProxy.Port}: IP:{sourceIP}, Port:{sourcePort}");
var clientContext = new ClientContext<TSendMessage, TReceiveMessage>
{
SourceAddress = sourceIP,
SourcePort = sourcePort,
DataAdapter = DataAdapterFactory<TSendMessage, TReceiveMessage>.Create(readStream, writeStream)
};
taskList.Add(HandleClient(clientContext));
}
}
HandleClient 方法定义为:
public async System.Threading.Tasks.Task HandleClient(ClientContext<TSendMessage, TReceiveMessage> clientContext);
我希望能够并行处理预定义数量的请求(处理程序函数是HandleClient)。客户端将连接,发送一些少量数据,然后关闭连接。由于整个项目是异步的,我很想为那部分尝试一种异步方法。
我很确定不推荐使用此解决方案,但我不知道如何以更好的方式完成它。我在这里找到了一个非常相关的主题:如何在新线程上运行任务并立即返回给调用者?
Stephen Cleary 在那里发表了评论:
好吧,我建议的第一件事是尝试更简单的练习。严肃地说,异步 TCP 服务器是您可以选择的最复杂的应用程序之一。
那么,在这种情况下,“正确”的方法是什么?我知道,我的方法“有效”,但我有一种不好的感觉,尤其HandleClient是或多或少是“即发即忘”。我对结果任务感兴趣的唯一原因是“节流”吞吐量(最初它是一个异步无效)。事实上,我的问题与链接中的 TO 完全相同。但是,我仍然不知道这种方法中最大的问题是什么。
我将不胜感激建设性的提示...谢谢。
回答
一句话:红隼。将所有这些问题转移到这一点上,同时获得许多其他好处,例如高级缓冲区生命周期管理,以及面向异步的设计。这里的关键 API 是UseConnectionHandler<T>,例如:
public static IWebHostBuilder CreateHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args).UseKestrel(options =>
{ // listen on port 1000, using YourHandlerHere for the handler
options.ListenLocalhost(1000, o => o.UseConnectionHandler<YourHandlerHere>());
}).UseStartup<Startup>();
你可以在这里看到一个微不足道但可运行的例子(嘿,如果你称一个主要功能类似于 redis 的服务器“微不足道”),或者我在私人 github 存储库中有其他例子,我可能会发送给你,包括一个示例,其中所有您需要做的是实现帧解析器(库代码处理其他所有内容,并且适用于 TCP 和 UDP)。
至于“油门”——这听起来像是一个异步信号量;SemaphoreSlim有WaitAsync.
- TIL! I had no idea Kestrel could handle raw TCP.
- @StephenCleary yep; and because it exposes everything as "pipelines" (i.e. `PipeReader`, `PipeWriter`), you don't need to deal with any of the back-buffer management etc - just take what you want from the inbound pipe