如何同时处理多个websocket消息?

如果有人可以帮助我使用 Python 和 async/await,任何帮助将不胜感激!

我需要监听 websocket 的消息,所以我设置了以下代码:

import websockets
import asyncio

my_socket = "ws://......."

# I set a "while True" here to reconnect websocket if it stop for any reason
while True:
    try:
        async with websockets.connect(my_socket) as ws:
            # I set a "while True" here to keep listening to messages forever
            while True:
                await on_message(await ws.recv())
    # If websocket gets closed for any reason, we catch exception and wait before new loop
    except Exception as e:
        print(e)
    # Wait 10 secs before new loop to avoid flooding server if it is unavailable for any reason
    await asyncio.sleep(10)

async def on_message(message):
    # Do what needs to be done with received message
    # This function is running for a few minutes, with a lot of sleep() time in it..
    # .. so it does no hold process for itself

我想做的是:

  • 收听消息
  • 消息一到达,用on_message()函数应用各种动作,持续几分钟
  • 继续收听消息,而先前的消息仍在处理中 on_message()

实际发生的情况:

  • 收听消息
  • 接收消息并启动on_message()功能
  • 然后程序on_message()在接收任何新消息之前等待函数结束,这需要几分钟,并使第二条消息延迟等等

我确实理解它为什么会这样做,正如await on_message()明确所说:等待 on_message() 结束,这样它就不会回去收听新消息。我不知道的是,我如何处理消息而不必等待此函数结束。

我的on_message()函数有很多空闲时间await asyncio.sleep(1),所以我知道我可以同时运行多个任务。

那么,如何在运行第一个任务的同时继续收听新消息?

回答

简而言之,您需要更改await on_message(await ws.recv())asyncio.create_task(on_message(await ws.recv())).

正如您正确指出的那样,await对您不起作用,因为它意味着等待任务完成。尽管代码是异步的,但从它由事件循环驱动并且您可以并行启动许多此类任务的意义上说,每个单独的循环都是顺序的。

另一种方法await是使用asyncio.create_task(). 这将创建一个任务,该任务将分段执行协程(每个部分在两个等待挂起的等待之间),并穿插着其他活动协程的等效部分。create_task()将返回任务的句柄,您可以(并且可能在某些时候应该)等待任务完成并获取其结果或异常。由于在您的情况下您不关心结果,因此您甚至不需要存储任务。


以上是如何同时处理多个websocket消息?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>