airflow之DAGs详解

airflow是一个描述,执行,监控工作流的平台。airflow自带了一些dags,当你启动airflow之后,就可以在网页端看到这些dags,我们也可以自己定以dag。

1.什么是DAGs
DAG是一个有向无环图,它是一个task的集合,并且定义了这些task之间的执行顺序和依赖关系。比如,一个DAG包含A,B,C,D四个任务,A先执行,只有A运行成功后B才能执行,C只有在A,B都成功的基础上才能执行,D不受约束,随时都可以执行。DAG并不关心它的组成任务所做的事情,它的任务是确保他们所做的一切都在适当的时间,或以正确的顺序进行,或者正确处理任何意外的问题。

2.什么是operators
DAG定义了一个工作流,operators定义了工作流中的每一task具体做什么事情。一个operator定义工作流中一个task,每个operator是独立执行的,不需要和其他的operator共享信息。它们可以分别在不同的机器上执行。
如果你真的需要在两个operator之间共享信息,可以使用airflow提供的Xcom功能。

airflow目前有一下几种operator:
BashOperator – executes a bash command
PythonOperator – calls an arbitrary Python function
EmailOperator – sends an email
HTTPOperator – sends an HTTP request
SqlOperator – executes a SQL command
Sensor – waits for a certain time, file, database row, S3 key, etc…

3.写一个简单的DAG
first_dag.py

# -*- coding:utf-8 -*-
import airflow
import datetime
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG

args = {
    'owner': 'test',
    'start_date': airflow.utils.dates.days_ago(1)
}   # 默认参数

dag = DAG(
    dag_id='first_dag',
    default_args=args,
    schedule_interval='*/1 * * * *',
    dagrun_timeout=datetime.timedelta(minutes=60)
)  # 创建一个DAG实例
dag.sync_to_db()     # 将dag写到数据库中

run_last = BashOperator(task_id='run_last', bash_command='echo 1', dag=dag)   # 定义一个operator,这个operator不做任何操作

for i in range(3):
    op = BashOperator(task_id='task_run_%s'%i, bash_command='ls -l', dag=dag)  # 执行 ls -l 命令
    op.set_downstream(run_last) # set_downstream set_upstream 定义operator的关系

if __name__ == "__main__":
    dag.cli()

发表评论

电子邮件地址不会被公开。 必填项已用*标注