Python将Python分发到Airflow任务在〜5行代码中

2021-05-13 07:24:29

您现在可以在天文学家注册表中找到光线提供程序,为Apache Airflow集成创建的Apache气流集成的发现和分发集线器汇总和策划生态系统的最佳位。

机器学习(ML)已成为所有行业的公司数据生态系统的关键部分。随着气流社区的增长,我们希望在董事会中赋予数据科学和工程团队,将数据流水线扩展到高价值结果。考虑到这一点,我们只是自然地转向建造最佳气流+ ML故事。

现代ML框架中最佳质量措施之一是它允许数据科学家和工程师的灵活性和敏捷性。如果使用良好的框架或工具,可以在数小时而不是日期测量从设置到工作生产模型的训练所需的时间,而且迭代改进和添加是常态。

在其自身,气流是一种有价值的工具,可实现可靠且可重复使用的ML型号。使用DAG来帮助构建模型立即带来易于参数化,SLAS&amp等益处。警报,数据水印&谱系能力和强大的可扩展性。

但是,当我们看看实现这些目标时,我们认为有几件事是等式的关键部分:

最小转换。数据科学家应该能够直接从Jupyter笔记本(和其他类似环境)直接汇编代码,并以最小的变化运行它。

大型数据集处理。 ML模型自然涉及大型数据集,并且在不同任务之间移动的大型数据集应该是微不足道的。

每次任务可扩展性。数据科学家应该能够为手头的任务请求资源,并使系统快速有效地分配这些资源,即Pytorch的GPU,用于DataFrame的RAM,或用于XGBoost的CPU内核。

ML生态系统集成。数据科学家应该能够轻松地集成录制,查询和复制实验的工具,以及注册和amp;部署生成的模型。

幸运的是,存在一部可能会与Airflow结合的少数开源框架,以创建一个第一类ML故事。我们特别兴奋的是雷。

Ray是一个Python-First群集计算框架,允许Python代码,即使具有复杂的库或包,也可以在无限大小的群集中分发和运行。与像Spark这样的系统不同,需要复杂的群集设置和Java依赖项,Ray可以从用户的笔记本电脑从用户的笔记本电脑运行相同的代码,以极为最小的配置的高度供电的AWS虚拟机。

一旦运行,用户就可以根据每函数的基础分配Ray资源(例如,给出此功能2 CPU和1 GPU)。如果正确使用,Ray的分布式计算结合了气流的强大调度和编排,为快速,可靠的ML开发和部署提供了一个完美的平台。

雷高度表现,并且在引擎盖下写入C ++以快速&使用GRPC自动移动Python对象,因为调用以前呼叫的新功能。所有这些都是抽象的 - 作为用户,您只需编写Python代码。

气流和射线不需要特殊的包或设置一起工作,但我们发现了大多数用户集成了这些工具的效率低下。因此,我们希望以测试,可扩展的方式标准化气流+射线使用的最佳实践。

与光线和任何脚轮一起,我们在天文学家们很高兴向您介绍Apache气流的光线提供商。

在该提供商中,我们通过扩展任务流API将所有光线特定的设置/初始化代码封装到装饰器中而不是运营商。此装饰员允许Airflow用户在Python函数中保留所有光线代码,并通过Python函数移动数据来定义任务依赖性。

因此,Airflow + Ray用户可以看到它们正在启动的代码,并具有完全灵活性来修改和模板其DAG,同时仍然利用Ray的分布式计算能力。

为了展示这种集成的力量,让我们开始我们的数据科学流水线,其中所有数据科学管道启动:jupyter笔记本。

在此示例中,我们创建了一个基本的jupyter笔记本模型,从而提取了HIGGS数据集,拆分培训和测试数据,并使用XGBoost在光线上创建/验证模型,该模型将XGBoost培训缩放到使用Ray的群集中。

这里的所有内容都可以在用户的​​笔记本电脑上或通过远程射线集群在本地运行,最小的工作。这堆栈非常好地处理模型建筑的实验方面,现在我们现在可以将该实验转化为生产准备的气流DAG。

跳转到此存储库。在那里遵循方向,你应该跑步和运行。

请注意,随着项目移动到Beta,此代码更具改进。

让我们通过如何在第一节中调整我们的笔记本电脑代码到我们刚才刚提到的申请。

要使用Ray Decorator将我们的ML笔记本转换为气流DAG,我们完成了以下步骤:

@ ray .remote def train_model(数据):train_df,validation_df = data evallist = [(validation_df,' eval')] evals_result = {} config = {" tree_method" :" stay" ," eval_metric" :[" logloss" ,"错误" ],}写(")bst = xgboost_ray .train(params = config,dtrain = train_df,evals_result = evals_result,ray_params = xgb。rayparams(max_actor_restarts = 1,num_actors = 8,cpus_per_actor = 2 ),num_boost_round = 100,evals = evallist)返回bst

@ ray_task(** task_args)def train_model(data):train_df,validation_df = data evallist = [(validation_df,' eval')] evals_result = {} config = {" tree_method" :" stay" ," eval_metric" :[" logloss" ,"错误" ],bst = xgb .train(params = config,dtrain = train_df,evals_result = evals_result,ray_params = xgb。rayprams(max_actor_restarts = 1,num_actors = 8,cpus_per_actor = 2),num_boost_round = 100,evals = evallist)返回bst

@dag(default_args = default_args,schedule_interval = none,start_date = days_ago(2),标签= ['完成 - modin-examplex'])def task_flow_xgboost_modin():build_raw_df = load_dataframe()data = create_data(build_raw_df )TrousoRap_Model = Train_Model(Data)Task_Flow_xgBoost_Modin = task_flow_xgboost_modin()

将DAG上传到气流,您的数据管道现在是一个正在运行的气流DAG!

通过可能20分钟的工作,数据科学家可以将本地Python脚本转换为具有射线分布式计算和气流编排的功率的大规模可再现的管道。然后,这些数据工程师可以利用Airflow变量,连接,调度间隔以及许多制作气流的许多功能,这与需要生产级调度和编排的数据团队进行泛滥。

注意:在提供商包的Alpha版本中,您的气流环境需要运行Ray Xcom后端,以确保装饰器完全正常。

在上面的示例中,Airflow用户可能会注意到我们在任务之间传递整个Dataframe,而无需将这些数据块显式发送到外部存储。通过传统的XCOM,这将是不可能的,因为Airflow将在元数据DB的单个小区中的任务中发送的每条数据存储在任务之间。

要解决这个问题,我们可以利用Ray' S最酷的功能:内存中的对象存储。为确保ML的有效数据处理,Ray利用一个对象存储系统,该系统能够快速数据传输和零拷贝读取。使用Ray Decorator,气流将利用对象存储作为缓存系统,允许大型数据对象留在跨多个任务的工人的RAM内。在任务之间没有更多的书写和阅读数据!

虽然该alpha释放实现了用于在Ray任务之间传递数据的射线等离子存储,但将来的版本将简化来自射线的移动数据和到不同的数据存储,并且可能甚至扩展光线自定义XCOM后端,以便使用A的任务之间移动数据。多个不同的气流运营商。

上面发表的气流的光线提供商目前处于alpha中,这意味着将有一些粗糙的边缘。我们欢迎任何和所有错误报告,建议或PRS!您可以在此处找到代码。

气流具有模板和动态的参数化功能,并且在与Ray调谐结合时,可以使用任何机器学习框架协调并动态调整ML搜索空间 - 包括Pytorch,XGBoost,MXNet和Keras - 同时轻松集成录制工具,查询并复制实验,以及寄存器&部署生成的模型。

@ ray_task(** task_args)def tune_model(数据):search_space = {#您可以将常量与搜索空间对象混合。 "目标" :"二进制:逻辑" ," eval_metric" :[" logloss" ,"错误" ]," max_depth" :调整.randint(1,9)," min_child_weight" :曲调.Choice([1,2,3])," subsample" :曲调.Uniform(0.5,1.0)," eta" :曲子.LogUniform(1E - 4,1E - 1),}印刷("能够激进早期停止不良试验")#这将使侵略性的早期停止不良试验。 scheduler = ashascheduler(max_t = 10,grace_period = 1,sextaing_factor = 2#10培训迭代)打印("调整")分析= tune .run(调谐.with_parameters(train_model,data = data),度量标准= " eval-logloss" mine =" min",local_dir = local_dir,#您可以添加" gpu&#34 ;: 0.1分配gpus resources_per_trial = xgb_ray_params .get_tune_resources() ,config = search_space,num_samples = 10,scheduler = scheduler,)打印("完成调整")返回分析@ ray_task(** task_args)def load_best_model_checkpoint(分析):打印("检查分析&# 34;)best_bst = xgb .booster()打印(f"分析最佳导致eval-error上的结果是:{分析.best_result [' eval-error']}")打印(& #34;用最佳参数&#34加载模型和#34;)best_bst .load_model(OS .path .join(分析.best_checkpoint," model.xgb"))精度= 1。 - 分析.best_result [" eval-error" ]打印(F"最佳型号参数:{分析.best_config}")打印(f"最佳型号总精度:{精度:.4f}")#我们现在可以做进一步的预测使用#best_bst.predict(...)返回best_bstanalysis = tune_model(data)best_checkpoint = load_best_model_checkpoint(分析)

一个主要的福利气流可以提供RAY用户是具有容错数据存储的重新运行任务的能力。 Ray在每个工人流程上使用本地等离子存储,以将数据保留在内存中以进行快速处理。当迅速处理数据时,此系统很大,但如果ray集群存在问题,可能会丢失。

通过提供检查点,气流射线用户可以指向DAG中的步骤,其中数据持有在外部商店(例如S3)。此容错性将意味着如果任务重新运行并且数据不再在本地可用,则该任务将能够从持久存储器中提取数据。

@dag(default_args = default_args,schedule_interval = none,start_date = datetime(2021,1,1,0,0,0),标签= ['完成 - modin-example'])def checkpoint_data_example(): @ ray_task(** task_args,checkpoint = true,checkpoint_source =" s3_connection_id")def figure_long_task(数据):。 。 。返回模型@ ray_task(** task_args)def deploy_model(型号):部署(型号)data = load_data()model = siment _long_task(data)deploy_model(型号)

在未来的这个装饰者的迭代中,我们将创建一个功能,以便将数据从本地气流任务从本地气流任务转移到光线任务和后面。该系统将使用任何自定义XCOM后端(包括Ray Custom Xcom后端),以启用全本机Python体验的Airflow用户。

对于那些寻求进一步减少运行开销的人,AnyScale为托管射线集群提供托管解决方案,以及以编程方式控制ML基础架构的API / SDK。

从那里,一切都会在OSS中正如您所需的计算资源在更加平行或更高吞吐量方案的情况下工作。

气流+射线是一种用于编写机器学习或数据ETL管道的强大组合。 这个Alpha集成只是一个开始,我们可以等待在添加新功能和改进时分享更多。 我们正在积极开发该系统,因此会非常感谢您的任何功能请求或问题 - 如果您' d喜欢参与其中,请随时打开存储库上的问题!