上一篇介绍了 airflow 的基本概念和操作,隔了两个月,终于觉得要写些进阶内容了, 同时也在公司内开始摸索着使用了起来,中间也是遇到了许多问题,如这个项目的网址所表达的, 现在仍是一个孵化项目,使用在线上仍需谨慎。我就来做回吃螃蟹的人,分享下最近使用的心得, 以及遇到的一些问题。


任务结构

安装完部署到线上,遇到的第一个问题是任务的结构,虽然 airflow 的 dag 文件也是 python写的, 但和业务代码实际关系不太大,就像一个 shell 脚本,所以放在运维的项目中,建一个叫 dags的目录, 之后如果有其它业务的 dag 要进来再新建子目录就行,同时别忘了把 airflow 配置的 dag 目录设置在这。

环境变量

环境变量总是一个让人头疼却又绕不开的东西,因为业务代码用了 conda,所以至少要设置一下PYTHONPATH, 但这个变量其实应该是由外部部署程序决定的(比如我们用的ansible部署),也许还会分机房有不同配置, 所以这里建议写一个类似const.py.j2的文件,把这些由外部决定的变量放进去, 在部署时由程序生成出来。一个可能的写法像这样:

# const.py.j2
home_dir = ""
PYTHONPATH = ":/usr/bin"

但这样写在调试程序时就比较麻烦,只能每个人自己写一份自己的配置文件(就像自己生成配置文件)。

当然,如果线上部署不会根据机房机器发生变化的话,那直接写一个const.py也是没问题的。

代码规范

dag 文件只要按照 PEP 8 的标准来写就行,而且一般行数也不多,大部分是在描述, 没什么逻辑在其中。但我发现 airflow 对于要执行的脚本会有些要求。

  1. 脚本最好有明确的时间参数,执行后只更新那个时间点(段)的数据, 并且在 dags 中也显式地指定执行的时间。可以用{{ macros.ds_add(ds, 1) }}来对时间进行偏移。
  2. 脚本如果只需要当前时间执行的,那最好有个上游函数(PythonBranchOperator)来判断是否需要继续执行, 如果 execute_time 过了当前时间一段时间了那就直接跳过不执行了。
  3. 每个脚本的功能做到”Do one thing and do it well.”,不要把一堆操作放在一个地方, 原先的脚本可能是要做先执行 foo,完成后再执行 bar 这样类似的操作, 这应该正是 airflow 发挥作用的地方,所以最好拆开来写。当然如果真的只是一些没有依赖关系的操作, 又不需要并行执行,写在一个脚本里我也没意见。
  4. 注意脚本是否能并行执行,比如是否写了同一个文件,如果是,做好线程安全处理。

调试

最大问题还是上面说的环境变量问题,在自己机器上维护一份const.py且不入库, 然后搭一套 airflow 还是很快的,执行只要用默认的顺序执行器就好,写完 dag 直接在界面或者用命令去测都可以。 因为有些测试服只有内网能访问,而且又没法改 nginx 配置的话,可以用 ssh 隧道把端口映射到本机,

ssh -N -f -L local_port:127.0.0.1:remote_port user@host

只需要改下 local_port 和 remote_port 就行了,命令的解释可以看这里

部署

部署方面基本比较流畅,只要拷到 dag 目录就可以了。如果文件拷贝时间比较长, 可能会导致 airflow 读取的文件内容不一致,可以先将文件拷到一个目录,然后用软链链接到新目录就行了。

其他杂事

在写 dag 时都会指定 start_date,那么这个时间要指定从什么时候开始就比较头疼, 因为在上线后,打开了任务,airflow 会把 start_date 以后的都执行一遍, 这可能会导致不必要的运行,一个解决办法是在命令行直接都标记为成功, 但我们规定线上不能随便操作,因为放开了 airflow run/backfill 这些命令, 和直接在线上执行脚本没太大区别了,折中的做法是统一由一个人来执行, 而不是让任何人都能随意地去执行命令,把自由降到最小。

焦油坑

在用的时候遇到的坑也是有的,查问题又查到了这里, 原来别人早已遇到过了,挑几个比较重要的列出来。

  1. 使用 BashOperator 时,因为 jinja2 的关系,在脚本的最后要留一个空格,否则会报一个诡异的 jinja2 错误。
  2. 使用 BahsOperator 如果 command 要使用 python 字符串命名变量的形式(如'{foo}_{bar}'.format(bar=bar, foo=foo)), 并且要用模板变量时,命令就要写成类似这样 'python {file} "{{{{ ds }}}}"'.format(file=file) 正常的字符串替换用一层{},模板变量用四层{}。 另外一个小提示是,ds 这种时间变量两边要带上引号,否则2017-02-13 00:00:00的变量会被 shell 认为是两个参数, 当然这可不是 airflow 的锅。
  3. 如果要修改 dag 的开始时间或运行间隔,一定要顺便改下 dag_id,例如改成 my_dag_v1, my_dag_v2, 否则会出现各种诡异的问题,因为 scheduler 会监视目录下的文件改变,鬼知道新的 dag 会更新成什么样。
  4. 一般对于定时任务,大部分都是今天跑昨天的任务,比如 02-13 当天跑的时间参数是 02-12, 这个 airflow 会自动把时间减掉一个周期,注意这里是上一个周期,如果任务是周级的, 那执行时,传入的参数实际是 02-06 的日期。

airflow 不足

在使用 airflow 过程中也发现了一些缺点,主要集中在 web UI 上,在这先列出一些:

  1. 官方文档上说 dag run 对象表示一个 dag 的执行,在 web UI 上直接创建这个并没有什么用, 无论选成哪种状态,似乎 scheduler 并没有轮询到这个
  2. webserver 启动时无法指定 subdir,这导致它的 dags 显示非常混乱,有时候可以显示不在 airflow_home下的 dag, 有时候又消失了
  3. web UI 在子图中,点查看 Code 会抛异常
  4. web UI 在 dag run 下,无法修改状态,会报 CSRF token missing
  5. 无法在 web UI 下新启之前没跑过的任务(可以重跑之前完成的任务),只能通过打开任务让它自己去跑
  6. Mark success 链接对于子任务不起作用,会报 404 lots of circles 的错误
  7. Task Instances 无法对单列进行排序,总是先按状态排序,再排 dag_id等等

总结

以上是个人对airflow 一些进阶使用的心得,中间也会在 google 上搜索各种解决方案, 但因为毕竟用的人比较少,能找到的文章并不多,大部分都是指向官方文档,反复研读文档也是很必要的, 只有几篇文章是类似我这样的使用心得,而且讲的也是比较特定的场景,但至少还是有些启发。 我觉得一个获取帮助的快速渠道是一些官方的 channel,比如它的 gitter, 如果没人理就隔段时间再问一次,回复可能是半天以上,但相比自己搞不出来已经好挺多的。 当然更快的办法是看源码,可能顺便还能提个 PR。 不过我相信随着项目的推广会有越来越多的人加入进来,有更多的技巧会分享出来, 如果你发现有任何这些方面相关的文章可以通过留言,或者直接用边栏联系方式找到我。