与快餐和芹菜的异步任务

2021-05-12 09:43:17

如果长期运行过程是应用程序的一部分' S工作流程,而是阻止响应,您应该在正常请求/响应流之外处理它。

也许您的Web应用程序要求用户提交缩略图(可能需要重新大小)并在注册时确认其电子邮件。如果您的应用程序处理了该图像并直接在请求处理程序中发送了确认电子邮件,那么最终用户将不必要地等待在页面加载或更新之前完成处理。相反,您' ll希望将这些进程传递给任务队列,并让单独的工作进程处理它,因此您可以立即将响应发送回客户端。然后,在处理发生时,最终用户可以在客户端执行其他内容。您的应用程序也可以自由地响应其他用户和客户的请求。

为实现这一目标,我们' LL通过设置和配置Celery和Redis来处理FastAPI应用程序的长期进程的过程。我们' LL也使用Docker和Docker撰写,将所有东西系在一起。最后,我们' ll看如何用单元和集成测试测试芹菜任务。

同样,为了提高用户体验,在后台进程中应该在正常的HTTP请求/响应流之外运行长时间运行的进程。

与您建立一个应用程序时,尝试区分应在请求/响应生命周期(如CRUD操作)中运行的任务从背景中运行。

值得注意的是,你可以利用Fastapi' s backgroundtasks类,它直接来自Starlette,在背景中运行任务。

来自Fastapi导入背景特设需要def send_email(电子邮件,消息):pass @app。获得(" /")async def ping(background_tasks:backgroundtasks):background_tasks。 add_task(send_email," [电子邮件受保护]","嗨!")返回{"消息" :" Pong!" }

CPU密集型任务:Celery应该用于执行繁重的背景计算的任务,因为背景特定在服务于您的应用程序的同一事件循环中运行。

任务队列:如果您需要任务队列来管理任务和工人,则应使用Celery。通常,您' ll想要检索作业的状态,然后根据状态 - 即,发送错误电子邮件,启动不同的后台任务,或重试任务来执行一些操作。芹菜为您管理所有这些。

我们的目标是开发一个Fastapi应用程序,它与芹菜一起配合使用,以处理正常请求/响应周期之外的长期运行过程。

最终用户通过向服务器端的POST请求启动新任务。

在路由处理程序中,将任务添加到队列中,并且任务ID被发送回客户端。

使用Ajax,客户端继续轮询服务器以检查任务本身在后台运行的任务状态。

从FastApi-Celery Repo克隆基本项目,然后从Master Branch中查看V1标记:

自从我们' LL总共需要管理三个过程(Fastapi,Redis,Celery Worker),我们使用Docker通过将它们的工作流程来简化我们的工作流程,以便它们都可以从一个终端窗口运行单个命令。

$ Docker-Compose Exec网Python -M Pytest ==================================测试会话开始=== ================================平台linux - python 3.9.5,pytest-6.2.4,py-1.10 .0,Pluggy-0.13.1Rootdir:/ usr / src / appcollected 1 itemtests / test_tasks.py。 [100%] =================================== 1通过0.06s ====== ==============================.

├──吉蒂尼 - ──授权¶──readme.md├──docker-compose.yml└──项目├────────────────────────────────静态─main.css│── - main.js├──模板│├──_base.html│── .py└──test_tasks.py.

项目/ templates / home.html中的onclick事件处理程序设置为按钮点击:

< div类=" btn-group"角色="组" aria-label ="基本示例" > <按钮类型="按钮" Class =" BTN BTN-Primary" onclick =" handleclick(1)" >短< / a> <按钮类型="按钮" Class =" BTN BTN-Primary" onclick =" handleclick(2)" >培养基< / a> <按钮类型="按钮" Class =" BTN BTN-Primary" onclick =" handleclick(3)" >长< / a> < / div>

在项目/静态/ main.js中找到的onclick调用handleclick,它将ajax post请求发送到服务器,具有适当的任务类型:1,2或3。

函数handleclick(类型){fetch(' /任务' {方法:'帖子'标题:{' content-type':'申请/ json'},body:json。stryify({type:type}),})。然后(响应=>响应。JSON())。然后(res => getstatus(res。数据。task_id)); }

CELERY使用消息代理 - RabbitMQ,REDIS或AWS简单的队列服务(SQS) - 以促进芹菜工人和Web应用程序之间的沟通。消息被添加到代理中,然后由工作人员处理。一旦完成,结果将添加到后端。

Redis将被用作经纪人和后端。将redis和celery worker添加到docker-compose.yml文件中:

版本:' 3.8'服务:Web:构建:./project端口: - 8004:8000命令:Uvicorn Main:App --host 0.0.0.0 - 中加载卷: - ./project:/usr/src/app环境: - celery_broker_url = redis: // redis:6379/0 - celery_result_backend = redis:// redis:6379/0 devens_on: - redis工作者:build:./project命令:celery worker - app = worker.celery --loglevel = Info卷: - 。 /项目:/ usr / src / app环境: - celery_broker_url = redis:// redis:6379/0 - celery_result_backend = redis:// redis:6379/0 depends_on: - web - redis redis:图片:Redis:6-Alpine

从芹菜导入芹菜芹菜=芹菜(__name__)芹菜。 Conf。 broker_url =操作系统。环境。得到(" celery_broker_url"," redis:// localhost:6379")芹菜。 Conf。结果_Backend = OS。环境。得到(" celery_result_backend"," redis:// localhost:6379")@celery。任务(名称=" create_task")def create_task(task_type):时间。睡眠(int(task_type)* 10)返回true

在这里,我们创建了一个新的celery实例,并使用任务装饰器,我们定义了一个名为create_task的新芹菜任务函数。

更新路由处理程序以启动任务并响应任务ID:

函数handleclick(类型){fetch(' /任务' {方法:'帖子'标题:{' content-type':'申请/ json'},body:json。stryify({type:type}),})。然后(响应=>响应。JSON())。然后(res => getstatus(res。数据。task_id)); }

从原始Ajax请求中返回响应时,我们将继续使用每秒任务ID调用getStatus():

函数getStatus(taskID){fetch(`/任务/ $ {taskId}`,{方法:' get'标题:{' content-type':'申请/ json'},})。然后(响应=>响应。JSON())。然后(res => {const html =`< tr> $ {taskId}< / td> $ {res。数据。task_status}< / td>< td> $ {res。task_result}< / td>< / tr>`;文件。getElementbyid('任务')。prepend(html); const newrow = document。getElementbyid('表& #39;)。insertrow(); newrow。innerhtml = html; const taskstatus = res。数据。task_status;如果(taskstatus ==='完成' || taskstatus ==='失败&# 39;)返回false; setTimeout(function(){getstatus(restatus。task_id);},1000);})。 catch(err =>控制台。日志(错误)); }

如果响应成功,则将新行添加到DOM上的表中。

@应用程序 。获得(" /任务/ {task_id}")def get_status(task_id):task_result = asyncresult(task_id)结果= {" task_id" :task_id," task_status" :task_result。状态," task_result" :task_result。结果}返回jsonresponse(结果)

然后,从响应中获取Task_ID并调用更新的端点以查看状态:

在Docker-compose.yml中更新工作服务,以便芹菜日志转储到日志文件:

工人:build:./project命令:celery worker - app = worker.celery --loglevel = info --logfile = logs / celery.log卷: - ./project:/usr/src/app环境: - celery_broker_url = redis:// redis:6379/0 - celery_result_backend = redis:// redis:6379/0 depends_on: - web - redis

将新目录添加到"项目"叫做"日志。然后,将名为celery.log的新文件添加到新创建的目录。

自从我们设置卷以来,您应该看到当地填写日志文件:

[2021-05-08 15:32:24,407:INFO / MAINPROCESS]连接到REDIS:// REDIS:6379/0 [2021-05-08 15:32:24,415:INFO / MAINPROCESS] MINGLE:寻找邻居[2021 -05-08 15:32:25,434:信息/主题]混合:全部单独[2021-05-08 15:32:25,448:Info / mainProcess] [电子邮件受保护]准备好。 [2021-05-08 15:32:29,834:INFO / mainProcess]接收任务:CREATE_TASK [013DF48C-4548-4A2B-9B22-7267DA215361] [2021-05-08 15:32:39,825:INFO / FORKPOOLWORKER-7]任务create_task [013df48c-4548-4a2b-9b22-7267da215361]成功于10.02114040000015s:true

花是一种轻量级,实时,用于芹菜的基于网络的监控工具。您可以监控当前运行的任务,增加或减少工人池,查看图形和许多统计信息,以命名几个。

仪表板:build:./project命令:flower -app = worker.celery --port = 5555 --broker = redis:// redis:6379/0端口: - 5556:5555环境: - celery_broker_url = redis:// redis:6379/0 - celery_result_backend = redis:// redis:6379/0 depends_on: - web - redis - 工作人员

导航到http:// localhost:5556要查看仪表板。你应该看到一名工人准备好了:

==================================测试会话开始============= ======================平台linux - python 3.9.5,pytest-6.2.4,py-1.10.0,pluggy-0.13.1rootdir:/ usr / src / appplugins:celery-4.4.7 ofcolted 2 item / 1取消选择/ 1 selectedTests / test_tasks.py。 [100%] ====================== 1通过,1在60.05s中取消选择(0:01:00)========= ===============.

值得注意的是,在上面的断言中,我们使用.Run方法(而不是.delay)直接在没有芹菜工人的情况下直接运行任务。

@patch(" worker.create_task.run")def test_mock_task(mock_run):ssuert create_task。运行(1)create_task。跑步 。 assert_called_once_once_on(1)assert create_task。运行(2)assert create_task。跑步 。 call_count == 2 assert create_task。运行(3)assert create_task。跑步 。 call_count == 3.

$ docker-compose exec web python -m pytest -k" test_mock_task" ==================================测试会话开始============= ======================平台linux - python 3.9.5,pytest-6.2.4,py-1.10.0,pluggy-0.13.1rootdir:/ usr / src / appplugins:celery-4.4.7 ichected 3项目/ 2取消选择/ 1 selectedTests / test_tasks.py。 [100%] ============================ 1通过,2在0.13S中取消选择========== ===================.

def test_task_status(test_app):response = test_app。帖子(" /任务",data = json。转储({"类型":1}))content =响应。 json()task_id = content [" task_id" assert task_id response = test_app。获取(f"任务/ {task_id}")content =响应。 JSON()assert content == {" task_id" :task_id," task_status" :"待处理" ," task_result" :无}断言回复。 CONTER_CODE == 200而内容[" task_status" ] =="待决" :Response = test_app。获取(f"任务/ {task_id}")content =响应。 JSON()assert content == {" task_id" :task_id," task_status" :"成功" ," task_result" : 真的 }

请记住,此测试使用开发中使用的相同代理和后端。您可能想要实例化一个新的Celery应用程序进行测试。

这是如何配置Celery以在Fastapi应用程序中运行长期运行任务的基本指南。您应该让队列处理任何可能阻止或慢下用户代码的进程。

Celery也可以用于执行可重复的任务并分解复杂的资源密集型任务,以便可以在许多机器上分发计算工作负载以减少(1)完成时间和(2)机器处理上的负载客户要求。