最近被线上任务折磨得不行了,总是隔三差五出各种问题,导致日志丢了或者脚本没跑成功, 出了问题就需要手动去修复,比如手动把少的日志补齐,重跑失败的脚本。有些脚本之间有依赖关系, 手动跑起来就比较复杂,需要隔一会看一眼脚本有没有跑完,再接着跑下一个,严重影响效率。 所以我想如果有个程序能帮我定义好我的任务依赖关系,由它来自动解决运行时的依赖, 如果能有可视化界面看到执行状态,管理任务就更好了。 最近找到一个满足现在这些需求的开源项目—— airflow。


安装

airflow 的安装十分简单,用 pip 轻松搞定

export AIRFLOW_HOME=~/airflow
pip install airflow[slack]
airflow initdb

pip 安装的 slackclient 为可选,当你需要通知到 slack 时才会用到,但我十分建议也一起安装, 能够及时收到任务执行状况报告。

Quick Start

不得不说,airflow 的文档非常完善,从快速入门到整个框架的概念解释都很到位。 看完官方的 tutorial就可以开始干活了。 如前所说,我需要先设置一个 DAG 对象的一些属性,比如重试策略,起止时间,执行环境等, 就像这样:

1
2
3
4
5
6
7
8
9
10
11
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2015, 6, 1),
    'email': ['morefreeze@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    # 'end_date': datetime(2016, 1, 1),
}

参数看描述基本都可以理解,第三行的depends_on_past 表示就是是否依赖上一个自己的执行状态。 如果设置了 email 相关的配置,需要在 airflow.cfg 中配置下发件邮箱。 因为这个任务会一直执行下去,所以我把结束时间注释掉了。

以上只是配置了 DAG 的参数,下面建立了一个 dag 对象:

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval='* * * * *')

这里我修改了下官方的例子,schedule_interval 表示执行的周期, 我改成了 crontab的形式,这样更直观也方便修改, airflow 也提供一些字面意思的值表示执行周期,比如@hourly等,这会让脚本在X时0点执行, 但如果真在线上执行,我们一般会将不同脚本错锋执行,不会全设成X时0分执行,所以我建议用 crontab 形式的写法更好。

下面就开始定义任务了,实际上,在定义这个任务的过程,就像是在写一个 shell 脚本,只是这个脚本的每个操作可以有依赖。 不同的操作对应了不同的 Operator,比如 shell 就需要用 BashOperator 来执行。就像这样:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)


text = '{{ ds }} [%s] has been done' % (dag.dag_id)
t2 = SlackAPIPostOperator(
    task_id='post_slack',
    token='xoxp-your-key-here',
    channel='#random',
    username='airflow',
    text=text,
    dag=dag
)

t1 >> t2  # t2.set_upstream(t1)

我又修改了下例子,这个 DAG 包含两个任务 t1 和 t2,t1 是个 shell 命令,调用 date显示当前时间, t2 是个发往 slack 的操作,需要设置一个 slack token,可以从这里获得, 接着设置发往的 channel 和用户名,保持原样就好,发 slack 消息就需要刚才安装的时候装了 slackclient。

然后再看一眼发的消息text,airflow 执行的命令或这种消息是支持 jinja2 模板语言, {{ ds }}是一种宏,表示当前的日期,形如2016-12-16,支持的宏在这里

最后一行就是设置依赖关系,显而易见,这是 t1 先执行,t2 在 t1 完成后执行, 也可以用注释里的写法,但我觉得>>这样更直观,反之还有<<。 如果有多条依赖,只需要分行写就行了,就像这样:

t1 >> t
t3 >> t << t2
t >> w >> x

以上的依赖关系图就像这样:

t1 ---+
t2 ---+--> t ---> w ---> x
t3 ---+

以上,恭喜你已经成功创建了第一个 DAG 图,下面就可以开始执行了!

命令

airflow 的所有执行操作都需要在命令行下完成,这里不得不吐槽下,界面只能看任务的依赖, 包括任务执行状态,但如果任务失败了,还是要在命令行下执行,有些不人性化(当然你可以提个PR, :P)。

airflow 的命令总的来说很符合直觉,常用的有如下几个:

  • test: 用于测试特定的某个task,不需要依赖满足
  • run: 用于执行特定的某个task,需要依赖满足
  • backfill: 执行某个DAG,会自动解析依赖关系,按依赖顺序执行
  • unpause: 将一个DAG启动为例行任务,默认是关的,所以编写完DAG文件后一定要执行这和要命令,相反命令为pause
  • scheduler: 这是整个 airflow 的调度程序,一般是在后台启动
  • clear: 清除一些任务的状态,这样会让scheduler来执行重跑

从上面的命令顺序也可以看出,通常我的执行顺序是这样:编写完DAG文件, 直接用backfill命令测试整个DAG是否有问题,如果单个任务出错,查看log解决错误, 这时可以用test来单独执行,如果有依赖关系就用run执行,都搞定了后就用unpause打开周期执行, 当然 scheduler 是在后台默认打开的。之后运行过程中发现需要重跑则用clear命令。

一些概念

前面急于介绍 airflow 的例子,步子大有点扯着蛋,这里回过头来补充一些基础概念。

DAG (Directed Acyclic Graph)

它表示的是一些任务的集合,描述了任务之间的依赖关系,以及整个DAG的一些属性, 比如起止时间,执行周期,重试策略等等。通常一个.py文件就是一个DAG。 你也可以理解为这就是一个完整的shell脚本,只是它可以保证脚本中的命令有序执行。

task 任务

它就是DAG文件中的一个个Operator,它描述了具体的一个操作。

Operator 执行器

airflow定义了很多的 Operator,通常一个操作就是一个特定的 Operator, 比如调用 shell 命令要用 BashOperator,调用 python 函数要用 PythonOperator, 发邮件要用 EmailOperator,连SSH要用 SSHOperator。社区还在不断地贡献新的 Operator。

ds 日期

前面的脚本里用到了{{ ds }}变量,每个DAG在执行时都会传入一个具体的时间(datetime对象), 这个ds就会在 render 命令时被替换成对应的时间。这里要特别强调一下, 对于周期任务,airflow传入的时间是上一个周期的时间(划重点),比如你的任务是每天执行, 那么今天传入的是昨天的日期,如果是周任务,那传入的是上一周今天的值。

Macros

上一条说了ds变量,你肯定会说我的脚本里如果需要不同的时间格式或者不同的时间段怎么办, 这时候就到Macro出场了,airflow本身提供了几种时间格式,比如ds_nodash,顾名思义就是不带短横-的时间格式, 而且还会有一些相关的函数可以直接调用,比如ds_add可以对时间进行加减。

airflow 配置

前面为了尽快展示airflow的强大,我跳过了许多东西,比如它的配置。 在 airflow 初始化时,它会自动在AIRFLOW_HOME目录下生成ariflow.cfg文件,现在打开它让我们看看里面的构造。

executor

这是airflow最关键的一个配置,它指示了airflow以何种方式来执行任务。它有三个选项:

  • SequentialExecutor:表示单进程顺序执行,通常只用于测试
  • LocalExecutor:表示多进程本地执行,它用python的多进程库从而达到多进程跑任务的效果。
  • CeleryExecutor:表示使用celery作为执行器,只要配置了celery,就可以分布式地多机跑任务,一般用于生产环境。

sql_alchemy_conn

这个配置让你指定 airflow 的元信息用何种方式存储,默认用 sqlite,如果要部署到生产环境,推荐使用 mysql。

smtp

如果你需要邮件通知或用到 EmailOperator 的话,需要配置发信的 smtp 服务器。

celery

前面所说的当使用 CeleryExecutor 时要配置 celery 的环境。

小结

忽然发现一口气写了好多,但这些解决日常的需求基本是够了,我决定先按下笔头, 留下一些进阶姿势和线上应用实际会遇到的问题再写一篇airflow进阶