airflow 进阶
上一篇介绍了 airflow 的基本概念和操作,隔了两个月,终于觉得要写些进阶内容了, 同时也在公司内开始摸索着使用了起来,中间也是遇到了许多问题,如这个项目的网址所表达的, 现在仍是一个孵化项目,使用在线上仍需谨慎。我就来做回吃螃蟹的人,分享下最近使用的心得, 以及遇到的一些问题。
任务结构
安装完部署到线上,遇到的第一个问题是任务的结构,虽然 airflow 的 dag 文件也是 python写的, 但和业务代码实际关系不太大,就像一个 shell 脚本,所以放在运维的项目中,建一个叫 dags的目录, 之后如果有其它业务的 dag 要进来再新建子目录就行,同时别忘了把 airflow 配置的 dag 目录设置在这。
环境变量
环境变量总是一个让人头疼却又绕不开的东西,因为业务代码用了 conda,所以至少要设置一下PYTHONPATH
,
但这个变量其实应该是由外部部署程序决定的(比如我们用的ansible部署),也许还会分机房有不同配置,
所以这里建议写一个类似const.py.j2
的文件,把这些由外部决定的变量放进去,
在部署时由程序生成出来。一个可能的写法像这样:
# const.py.j2
home_dir = ""
PYTHONPATH = ":/usr/bin"
但这样写在调试程序时就比较麻烦,只能每个人自己写一份自己的配置文件(就像自己生成配置文件)。
当然,如果线上部署不会根据机房机器发生变化的话,那直接写一个const.py
也是没问题的。
代码规范
dag 文件只要按照 PEP 8 的标准来写就行,而且一般行数也不多,大部分是在描述, 没什么逻辑在其中。但我发现 airflow 对于要执行的脚本会有些要求。
- 脚本最好有明确的时间参数,执行后只更新那个时间点(段)的数据,
并且在 dags 中也显式地指定执行的时间。可以用
{{ macros.ds_add(ds, 1) }}
来对时间进行偏移。 - 脚本如果只需要当前时间执行的,那最好有个上游函数(PythonBranchOperator)来判断是否需要继续执行, 如果 execute_time 过了当前时间一段时间了那就直接跳过不执行了。
- 每个脚本的功能做到”Do one thing and do it well.”,不要把一堆操作放在一个地方, 原先的脚本可能是要做先执行 foo,完成后再执行 bar 这样类似的操作, 这应该正是 airflow 发挥作用的地方,所以最好拆开来写。当然如果真的只是一些没有依赖关系的操作, 又不需要并行执行,写在一个脚本里我也没意见。
- 注意脚本是否能并行执行,比如是否写了同一个文件,如果是,做好线程安全处理。
调试
最大问题还是上面说的环境变量问题,在自己机器上维护一份const.py
且不入库,
然后搭一套 airflow 还是很快的,执行只要用默认的顺序执行器就好,写完 dag 直接在界面或者用命令去测都可以。
因为有些测试服只有内网能访问,而且又没法改 nginx 配置的话,可以用 ssh 隧道把端口映射到本机,
ssh -N -f -L local_port:127.0.0.1:remote_port user@host
只需要改下 local_port 和 remote_port 就行了,命令的解释可以看这里
部署
部署方面基本比较流畅,只要拷到 dag 目录就可以了。如果文件拷贝时间比较长, 可能会导致 airflow 读取的文件内容不一致,可以先将文件拷到一个目录,然后用软链链接到新目录就行了。
其他杂事
在写 dag 时都会指定 start_date,那么这个时间要指定从什么时候开始就比较头疼, 因为在上线后,打开了任务,airflow 会把 start_date 以后的都执行一遍, 这可能会导致不必要的运行,一个解决办法是在命令行直接都标记为成功, 但我们规定线上不能随便操作,因为放开了 airflow run/backfill 这些命令, 和直接在线上执行脚本没太大区别了,折中的做法是统一由一个人来执行, 而不是让任何人都能随意地去执行命令,把自由降到最小。
焦油坑
在用的时候遇到的坑也是有的,查问题又查到了这里, 原来别人早已遇到过了,挑几个比较重要的列出来。
- 使用 BashOperator 时,因为 jinja2 的关系,在脚本的最后要留一个空格,否则会报一个诡异的 jinja2 错误。
- 使用 BahsOperator 如果 command 要使用 python 字符串命名变量的形式(如
'{foo}_{bar}'.format(bar=bar, foo=foo)
), 并且要用模板变量时,命令就要写成类似这样'python {file} "{{{{ ds }}}}"'.format(file=file)
正常的字符串替换用一层{}
,模板变量用四层{}
。 另外一个小提示是,ds 这种时间变量两边要带上引号,否则2017-02-13 00:00:00
的变量会被 shell 认为是两个参数, 当然这可不是 airflow 的锅。 - 如果要修改 dag 的开始时间或运行间隔,一定要顺便改下 dag_id,例如改成 my_dag_v1, my_dag_v2, 否则会出现各种诡异的问题,因为 scheduler 会监视目录下的文件改变,鬼知道新的 dag 会更新成什么样。
- 一般对于定时任务,大部分都是今天跑昨天的任务,比如 02-13 当天跑的时间参数是 02-12, 这个 airflow 会自动把时间减掉一个周期,注意这里是上一个周期,如果任务是周级的, 那执行时,传入的参数实际是 02-06 的日期。
airflow 不足
在使用 airflow 过程中也发现了一些缺点,主要集中在 web UI 上,在这先列出一些:
- 官方文档上说 dag run 对象表示一个 dag 的执行,在 web UI 上直接创建这个并没有什么用, 无论选成哪种状态,似乎 scheduler 并没有轮询到这个
- webserver 启动时无法指定 subdir,这导致它的 dags 显示非常混乱,有时候可以显示不在 airflow_home下的 dag, 有时候又消失了
- web UI 在子图中,点查看 Code 会抛异常
- web UI 在 dag run 下,无法修改状态,会报 CSRF token missing
- 无法在 web UI 下新启之前没跑过的任务(可以重跑之前完成的任务),只能通过打开任务让它自己去跑
- Mark success 链接对于子任务不起作用,会报 404 lots of circles 的错误
- Task Instances 无法对单列进行排序,总是先按状态排序,再排 dag_id等等
总结
以上是个人对airflow 一些进阶使用的心得,中间也会在 google 上搜索各种解决方案, 但因为毕竟用的人比较少,能找到的文章并不多,大部分都是指向官方文档,反复研读文档也是很必要的, 只有几篇文章是类似我这样的使用心得,而且讲的也是比较特定的场景,但至少还是有些启发。 我觉得一个获取帮助的快速渠道是一些官方的 channel,比如它的 gitter, 如果没人理就隔段时间再问一次,回复可能是半天以上,但相比自己搞不出来已经好挺多的。 当然更快的办法是看源码,可能顺便还能提个 PR。 不过我相信随着项目的推广会有越来越多的人加入进来,有更多的技巧会分享出来, 如果你发现有任何这些方面相关的文章可以通过留言,或者直接用边栏联系方式找到我。