airflow源码分析之BashOperator

BashOperator主要的功能是执行shell命令或者shell脚本。负责具体的执行过程的是BashOperator.execute()函数。
airflow的bash_operator.py文件:

from builtins import bytes
import os
import signal
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory


class BashOperator(BaseOperator):
    """
    :param xcom_push: If xcom_push is True, the last line written to stdout
        will also be pushed to an XCom when the bash command completes.
    :type xcom_push: bool
    :param env: If env is not None, it must be a mapping that defines the
        environment variables for the new process; these are used instead
        of inheriting the current process environment, which is the default
        behavior. (templated)
    :type env: dict
    :type output_encoding: output encoding of bash command
    """
    template_fields = ('bash_command', 'env')
    template_ext = ('.sh', '.bash',)
    ui_color = '#f0ede4'

    @apply_defaults    # 处理默认的参数
    def __init__(
            self,
            bash_command, # string 可以是单独的命令,或者是命令集,或者是.sh文件
            xcom_push=False,  # 如果两个operator有依赖关系时,值为True
            env=None,
            output_encoding='utf-8',   
            *args, **kwargs):

        super(BashOperator, self).__init__(*args, **kwargs)
        self.bash_command = bash_command
        self.env = env
        self.xcom_push_flag = xcom_push
        self.output_encoding = output_encoding

    def execute(self, context):
        """
        Execute the bash command in a temporary directory
        which will be cleaned afterwards
        """
        bash_command = self.bash_command
        self.log.info("Tmp dir root location: \n %s", gettempdir()) # 基类继承了处理log的mixin类
        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

                f.write(bytes(bash_command, 'utf_8'))
                f.flush()   # 将缓冲区的数据写入到磁盘中
                fname = f.name
                script_location = tmp_dir + "/" + fname
                self.log.info(
                    "Temporary script location: %s",
                    script_location
                )
                def pre_exec():
                    # Restore default signal disposition and invoke setsid
                    for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'):
                        if hasattr(signal, sig):
                            signal.signal(getattr(signal, sig), signal.SIG_DFL)
                    os.setsid()
                self.log.info("Running command: %s", bash_command)
                sp = Popen(
                    ['bash', fname],
                    stdout=PIPE, stderr=STDOUT,
                    cwd=tmp_dir, env=self.env,
                    preexec_fn=pre_exec)

                self.sp = sp

                self.log.info("Output:")
                line = ''
                for line in iter(sp.stdout.readline, b''):
                    line = line.decode(self.output_encoding).strip()
                    self.log.info(line)
                sp.wait()
                self.log.info(
                    "Command exited with return code %s",
                    sp.returncode
                )

                if sp.returncode:
                    raise AirflowException("Bash command failed")

        if self.xcom_push_flag:
            return line

    def on_kill(self):
        self.log.info('Sending SIGTERM signal to bash process group')
        os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)

TemporaryDirectory:创建一个临时的目录,它使用了@contextmanager,生成了一个上下文管理器,因此它能用在with环境里。用contextmanager装饰的函数,要返回一个生成器,并且只能返回一个值

@contextmanager  # 生成一个上下文管理器
def TemporaryDirectory(suffix='', prefix=None, dir=None):
    name = mkdtemp(suffix=suffix, prefix=prefix, dir=dir)  # suffix:后缀  prefix: 前缀
    try:
        yield name    # yield 生成器 仅返回一个值
    finally:
        try:
            shutil.rmtree(name)   # 当with结束之后,删除临时目录
        except OSError as e:
            # ENOENT - no such file or directory
            if e.errno != errno.ENOENT:
                raise e                                              

NamedTemporaryFile: 创建一个临时的文件,它继承了一个类,这个类实现了__enter__, __exit__ 方法,因此能用with
pre_exec: 捕捉信号,并进行信号处理
subprocess.Popen: 具体执行command或者shell脚本

airflow的安装和配置

1.安装
virtualenv airflow
export AIRFLOW_HOME=~/airflow
source airflow/bin/activate
pip install airflow
这个过程时间有点长,airflow安装了很多依赖包,数据库同步工具alembic, orm工具sqlalchemy, flask等
2.初始化数据库
airflow默认的数据库是sqlite,如果你想具体测试airflow的功能的话,你需要指定一个真实的数据库,mysql或者postgresql
airflow initdb

3.启动服务
airflow webserver -p 8080
启动服务之后,你就可以访问127.0.0.1来访问airflow。这时整个网站是没有登录入口的,需要在配置文件里配置才可以看到用户登录界面
4.配置登录界面
airflow配置文件在主目录下,airflow.cfg
找到[webserver]这一项
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
把这两项改完之后,保存配置文件
cd /airflow python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type “help”, “copyright”, “credits” or “license” for more information.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

重启服务 airflow webserver -p 8080
5.设置一个后端
修改airflow.cfg:
executor = LocalExecutor
sql_alchemy_conn = mysql://username:password@localhost:3306/dbname
初始化数据库:
airflow initdb
6.测试airflow的scheduler
启动scheduler服务: airflow scheduler 如果定时任务还没有运行的话,重启一下服务 airflow webserver -p 8080

airflow之DAGs详解

airflow是一个描述,执行,监控工作流的平台。airflow自带了一些dags,当你启动airflow之后,就可以在网页端看到这些dags,我们也可以自己定以dag。

1.什么是DAGs
DAG是一个有向无环图,它是一个task的集合,并且定义了这些task之间的执行顺序和依赖关系。比如,一个DAG包含A,B,C,D四个任务,A先执行,只有A运行成功后B才能执行,C只有在A,B都成功的基础上才能执行,D不受约束,随时都可以执行。DAG并不关心它的组成任务所做的事情,它的任务是确保他们所做的一切都在适当的时间,或以正确的顺序进行,或者正确处理任何意外的问题。

2.什么是operators
DAG定义了一个工作流,operators定义了工作流中的每一task具体做什么事情。一个operator定义工作流中一个task,每个operator是独立执行的,不需要和其他的operator共享信息。它们可以分别在不同的机器上执行。
如果你真的需要在两个operator之间共享信息,可以使用airflow提供的Xcom功能。

airflow目前有一下几种operator:
BashOperator – executes a bash command
PythonOperator – calls an arbitrary Python function
EmailOperator – sends an email
HTTPOperator – sends an HTTP request
SqlOperator – executes a SQL command
Sensor – waits for a certain time, file, database row, S3 key, etc…

3.写一个简单的DAG
first_dag.py

# -*- coding:utf-8 -*-
import airflow
import datetime
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG

args = {
    'owner': 'test',
    'start_date': airflow.utils.dates.days_ago(1)
}   # 默认参数

dag = DAG(
    dag_id='first_dag',
    default_args=args,
    schedule_interval='*/1 * * * *',
    dagrun_timeout=datetime.timedelta(minutes=60)
)  # 创建一个DAG实例
dag.sync_to_db()     # 将dag写到数据库中

run_last = BashOperator(task_id='run_last', bash_command='echo 1', dag=dag)   # 定义一个operator,这个operator不做任何操作

for i in range(3):
    op = BashOperator(task_id='task_run_%s'%i, bash_command='ls -l', dag=dag)  # 执行 ls -l 命令
    op.set_downstream(run_last) # set_downstream set_upstream 定义operator的关系

if __name__ == "__main__":
    dag.cli()