阿帕奇气流简介

2020-05-29 21:22:11

Airflow是一个由社区创建的平台,用于以编程方式编写、计划和监控工作流。

机器学习是业界的热门话题。如果不是涉及到数据处理,它就不会这么酷。

假设您有一个执行Twitter情感分析的ML模型。现在,您想要在Twitter上为您最喜欢的人每天的tweet运行该模型。这样的工作流应该是这样的。

如您所见,数据从管道的一端流向另一端。可以有分支,但不能有循环。

使用cron创建和维护任务之间的关系是一场噩梦,而在AirflowAirflow中,创建和维护任务之间的关系就像编写Python代码一样简单。

Cron需要外部支持来记录、跟踪和管理任务。用于跟踪和监控工作流执行的Airflow UI。

除非外部配置,否则Cron作业不可重现。AirflowAirflow保留所有执行任务的审核跟踪。

Airflow提供了DAG Python类来创建有向无环图,这是工作流的表示形式。

从Airflow.model导入DAG从airflow.utils.date导入Days_agoargs={';start_date';:day_ago(0),}dag=DAG(dag_id=';example_bash_Operator';,default_args=args,Schedule_Interval=';*';)Schedule_Interval是每个工作流运行的间隔。';*';表示任务需要每分钟运行一次。不要在这个语法上绞尽脑汁。您可以使用https://crontab.guru/.来处理这些问题。

from airflow.operators.bash_Operator导入BashOperatorfrom airflow.operators.python_Operator import PythonOperator def print_function():print(";嘿,我是一项任务";)run_this_last=PythonOperator(task_id=';run_this_last';,dag=dag,python_callable=print_function)run_this=BashOperator(task_id=';)。>;run_this_last Web服务器是负责处理所有UI和REST API的组件。

调度程序每隔n秒遍历DAG,并调度要执行的任务。

调度程序还有一个称为Executor的内部组件。执行者负责启动工人并执行任务直到完成。

SequentialExecutor一次仅运行一个任务。工作进程运行的计算机与调度程序运行的计算机相同。

LocalExecutor与Sequential Executor相同,不同之处在于它可以同时运行多个任务。

因此,甚至在Kubernetes之前,CeleryExecutor就已经成为AirflowAirflow的一部分很长一段时间了。

CeleryExecutors有固定数量的工作进程在计划任务时执行任务。

芹菜管理工人。万一失败了,芹菜会吐出一个新的。

芹菜需要RabbitMQ/Redis to来对任务进行排队,这是对AirflowAirflow已经支持的轮子的重新发明。

KubernetesExecutor在单独的Kubernetes Pod中运行每个任务。与CeleryCelery不同,它可以按需旋转工作区,从而最大限度地利用资源。

对分配给任务的资源进行细粒度控制。可以定义任务级别所需的CPU/内存量。

现在我们已经了解了气流的基础知识,让我们在下一篇文章中学习如何编写我们的工作流程。