跨多个工作人员共享python对象
我们已经使用 FastAPI 创建了一个服务。当我们的服务启动时,它会创建一些 Python 对象,然后端点使用这些对象来存储或检索数据。
生产中的 FastAPI 从多个工人开始。我们的问题是每个工人创建自己的对象而不是共享一个。
下面的脚本显示了我们正在做的(简化的)示例,尽管在我们的例子中 Meta() 的使用要复杂得多。
from fastapi import FastAPI, status
class Meta:
def __init__(self):
self.count = 0
app = FastAPI()
meta = Meta()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta.count += 1
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
return {'count':meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta.count = 0
return status.HTTP_200_OK
如上所述,多个 worker 的问题在于每个 worker 都有自己的meta对象。请注意,当使用单个工作器运行 api 时,该问题不可见。
更明确地说,当我们/increment第一次到达端点时,我们只会看到两个工作人员中的一个响应调用(这是正确的,我们不希望两个工作人员都做同样的事情)。但是,因为有两个独立的meta对象,所以只有两个对象之一会递增。
当到达/report端点时,根据哪个工作人员响应请求,将返回 1 或 0。
那么问题是,我们如何让工作人员共享和操作同一个对象?
作为一个附带问题,上述问题/reset也会影响端点。如果调用此端点,则只有一个工作人员将重置其对象。有没有办法强制所有工作人员响应端点上的单个呼叫?
谢谢!
编辑:我忘了提到我们已经尝试(但没有成功)将meta对象存储在app.state。本质上:
app.state.meta = Meta()
...
@app.get("/report")
async def report():
return {'count':app.state.meta.count}
回答
无法直接在不同进程之间共享 Python 对象。multiprocessing模块中包含的设施(如管理器或共享内存)不适合在工作人员之间共享资源,因为它们需要一个主进程来创建资源并且没有持久性属性。
在最优选的工人之间的资源共享方式:
- 数据库- 在需要可靠存储和可扩展性的资源的持久性的情况下。示例:
PostgreSQL、MariaDB、MongoDB和许多其他。 - 缓存(键/值) - 在数据的临时性质的情况下,比数据库更快,但没有这种可扩展性,并且通常不符合 ACID。例如:
Redis,Memcached等等。
下面我将展示两个非常简单的示例,说明如何使用这两种方法FastAPI在工作人员之间共享应用程序中的数据。举个例子,我把aiocache库Redis作为后端,Tortoise ORM库PostgreSQL作为后端。由于FastAPI是异步框架,我选择了asyncio基于库的库。
测试项目的结构如下:
.
??? app_cache.py
??? app_db.py
??? docker-compose.yml
??? __init__.py
Docker-compose 文件:
对于实验,您可以使用以下 docker-compose 文件将5432(Postgres) 和6379(Redis) 端口暴露给localhost.
version: '3'
services:
database:
image: postgres:12-alpine
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: test_pass
POSTGRES_USER: test_user
POSTGRES_DB: test_db
redis:
image: redis:6-alpine
ports:
- "6379:6379"
开始:
docker-compose up -d
缓存(aiocache)
Aiocache 提供了 3 个主要实体:
- backends:允许您指定要用于缓存的后端。目前支持:
SimpleMemoryCache,RedisCache使用aioredis和MemCache使用aiomcache。serializers:序列化和反序列化您的代码和后端之间的数据。这允许您将任何 Python 对象保存到缓存中。目前配套:StringSerializer,PickleSerializer,JsonSerializer,和MsgPackSerializer。但是您也可以构建自定义的。- plugins:实现一个钩子系统,允许在每个命令之前和之后执行额外的行为。
开始:
uvicorn app_cache:app --host localhost --port 8000 --workers 5
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status
app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")
class Meta:
def __init__(self):
pass
async def get_count(self) -> int:
return await cache.get("count", default=0)
async def set_count(self, value: int) -> None:
await cache.set("count", value)
async def increment_count(self) -> None:
await cache.increment("count", 1)
meta = Meta()
# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
await meta.increment_count()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
count = await meta.get_count()
return {'count': count, "current_process_id": os.getpid()}
# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
await meta.set_count(0)
return status.HTTP_200_OK
数据库 ( Tortoise ORM + PostgreSQL)
开始:为了简单起见,我们首先运行一个worker在数据库中创建一个schema:
uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C]
uvicorn app_db:app --host localhost --port 8000 --workers 5
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise
class MetaModel(Model):
count = fields.IntField(default=0)
app = FastAPI()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count += 1 # it's better do it in transaction
await meta.save()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
meta, is_created = await MetaModel.get_or_create(id=1)
return {'count': meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count = 0
await meta.save()
return status.HTTP_200_OK
register_tortoise(
app,
db_url="postgres://test_user:test_pass@localhost:5432/test_db", # Don't expose login/pass in src, use environment variables
modules={"models": ["app_db"]},
generate_schemas=True,
add_exception_handlers=True,
)