airflow作为apache基金会的一款开源的优秀调度系统,目前被国内外很多大中型企业使用;其丰富的算子(operator)类型和极易扩展的支持,被很多企业进行相关的自定义改造和二次开发以满足自身的业务需求。
但是我们不难发现几个问题,随着用户脚本(dag文件)和工程目录数量越来越多,我们可能面临整任务出现了延时调度的现象。
举个例子说明下,假设你设置了一个任务是每天8:00跑,但是你发现到了调度的时候延时到了8:02或者某个DAG上游节点调度完毕后,下游节点需要等很久才能得到调度。
今天我们就针对这个问题进行相关的分析并提出几点相关的优化建议。
针对以上存在的问题,我认为可以从以下几个方面对airflow的调度延时问题进行分析和优化,后面我们将对每个部分进行相关的介绍。
在定位问题前,我认为了解airflow源码dag(有向无环图,工作流的描述关系)文件工作原理能够使我们更好的定位分析问题,在airflow源码中有两个函数和dag文件的加载扫描有直接关系。
一个函数叫做:list_py_file_paths;一个叫做:process_file。从字面可以看出,list_py_file_path主要是用来收集整个dag目录下的所有py文件,process_file主要是用来处理目录下的文件。
① 针对list_py_file_path的逻辑,我们只需要关注这几行(部分源码省略):
②我们重点分析下process_file这个函数的逻辑分析。
整个代码的逻辑并不复杂,由于代码比较多,这里我只简单的介绍下它的工作原理,以及相关的处理方案。process_file函数主要用来处理AIRFLOW_HOME(环境变量)目录下的文件。
目前对于airflow而言支持处理两种类型的文件加载(python文件和zip文件),其他类型会做过滤。在整个处理过程中有两个地方其实是比较耗时的。如下:
① 在编写python dag脚本中,应特别注意规范。应避免大量的计算操作,如果存在不合理的耗时逻辑,那么在加载dag文件的时候,就会执行函数内部执行逻辑运算。
比如我在dag文件里面写了一个函数,然后直接调用函数,函数内部的逻辑是sleep操作,那么在dag文件大量加载的时候,一轮的调度扫描下来,将会花费大量的时间。甚至导致超时异常,导致scheduler异常。这点要尤其注意,所以dag编写规范特别重要。
② 如果我们的线上dag编写全是py类型文件,在不需要zip文件的场景的操作下,可以将zip操作逻辑注释掉。
zip处理其实在整个dag的处理过程中占用了较多的时间,即使你的项目中没有zip文件,那么对于process_file函数中也会有这样的逻辑判断,这样的话其实对于扫描dag文件而言会存在大量的运算,导致整个调度延时。
小结:根据以上的粗略分析,我们后面可以重点针对dag解析原理和dag编写规范两个方向进行调度延时相关的优化。
在搭建部署airflow的时候,很多企业会采用以下两种方式部署:
物理机部署的方式有其自身的优点但是也存在很多缺点。采用物理机方式在dag扫描和加载速度上要比容器化部署的方式快很多。
本质上的区别是:物理机直接从本地磁盘读取dag的内容,而容器化部署的方式则需要从网络磁盘读取内容,网络磁盘则需要走网络的IO。
所以从文件的加载效率上讲,物理机更胜一筹。但是物理机面临的问题也很多,比如资源的动态扩展,进程监控和重启机制等都不如容器化方便和高效。
与之对比,就容器化部署方式来说,目前业界最常用的就是基于k8s方式的部署。这种方式横向扩展airflow特别方便。
只要宿主机的资源比较大,那么对于airflow的web角色、scheduler角色,还是worker角色,扩展机器都是页面点击的操作,特别方便;甚至后面可以探究成自动的扩容和缩容机制。
并且在监控和报警层面k8s本身有自身的优势,包括进程的监控也有其自身独特的重启策略和机制。
对于资源的规划和分配,通过k8s也可以更好的去操作处理。但是k8s方式需要一个全局共享盘。一般来说我们常用的就是一些网络存储组件,无论是阿里的oss,nas还是亚马逊的s3等等,它们都有一个共性就是需要走网络IO,同时存在这数据一致性的风险。
网络IO的延时在T+1调度或者小时级调度粒度上也完全没有太多的问题,但随着业务量的增加比如10w+调度任务,可能就会存在延时问题。
小结:针对airflow的调度方式,我们可以根据自身的业务需要和任务量来选择部署方式,如果线上的任务数量很多,单个集群出现了性能瓶颈可以考虑多集群部署方式,来保证横向扩展解决IO的处理过慢的问题。
① 对于dag延时调度在airflow的参数上也需要注意合理话的配置,一般这些影响参数可以包括如下:
全局(集群)任务最大并行度,这个值需要根据整体集群的性能配置和单结点worker进程数量进行配置,如果配置的过大,本身集群的性能跟不上也是徒劳的。假设我们的airflow的集群worker节点所启动的默认worker进程是10个,我们有10台机器,理论上讲最大并行度100左右,假设这个值配置200,最大也只能达到100的并发。
为什么说这个值会影响调度延时呢?
因为对于airflow这样的调度系统,0-24小时都可能有大量的任务在调度,假设某一时刻任务峰值比较大,就会造成其他的任务等待,就会造成延时,这个时候就需要考虑这个参数的配置或者整体集群的扩容了。
这个参数比较容易理解,就是对于一个dag,最多能同时跑多少个task,如果某一时刻同一个dag下的任务启动的比较多,最多并行跑的数量也只能是dag_concurrency的值,所以就会造成其它任务的等待和延时。当然我们不能把
这个参数配置的过大,要考虑系统中dag之间的影响,如果单个dag的并行度过高,可能会造成其它dag的任务调度延时或者长时间得不到执行。
任务优先级这个很明显问题,由于很多任务的优先级设置的比较低,造成优先级高的任务先执行以至于后面的低优先级的任务得不到执行。那么我们先分析下airflow背后的优先级的逻辑。
举个例子,假设我们有一个如下图图结构的dag图,并且每个任务的节点的优先级都是默认值1
根据airflow的优先级计算策略我们可以把它归为3类:
1.任务本身优先级值的配置作为最终任务优先级的值。任务优先级配置可以在operator参数指定。官方源码参数解释如下:
2. 任务自身的优先级配置值加上所有下游优先级配置值之和。
3. 任务自身优先级的配置值加上所有上游优先级配置值之和。
小结:任务优先级配置也会对任务的执行造成延时,我们在日常的任务执行计划中,应当根据自身任务的紧急程度设置不同的优先级策略。
对于airflow而言优先级的默认配置是WeightRule.DOWNSTREAM,这种方式其实需要根据同一时刻有哪些task需要执行,并根据task所在dag的结构图计算出优先级值,进行相关的逻辑排名,然后进行执行。
因此在以后的生产环境中要根据真实的需要进行相关的优先级值的调整。