airflow源码分析之BashOperator

BashOperator主要的功能是执行shell命令或者shell脚本。负责具体的执行过程的是BashOperator.execute()函数。
airflow的bash_operator.py文件:

from builtins import bytes
import os
import signal
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory


class BashOperator(BaseOperator):
    """
    :param xcom_push: If xcom_push is True, the last line written to stdout
        will also be pushed to an XCom when the bash command completes.
    :type xcom_push: bool
    :param env: If env is not None, it must be a mapping that defines the
        environment variables for the new process; these are used instead
        of inheriting the current process environment, which is the default
        behavior. (templated)
    :type env: dict
    :type output_encoding: output encoding of bash command
    """
    template_fields = ('bash_command', 'env')
    template_ext = ('.sh', '.bash',)
    ui_color = '#f0ede4'

    @apply_defaults    # 处理默认的参数
    def __init__(
            self,
            bash_command, # string 可以是单独的命令,或者是命令集,或者是.sh文件
            xcom_push=False,  # 如果两个operator有依赖关系时,值为True
            env=None,
            output_encoding='utf-8',   
            *args, **kwargs):

        super(BashOperator, self).__init__(*args, **kwargs)
        self.bash_command = bash_command
        self.env = env
        self.xcom_push_flag = xcom_push
        self.output_encoding = output_encoding

    def execute(self, context):
        """
        Execute the bash command in a temporary directory
        which will be cleaned afterwards
        """
        bash_command = self.bash_command
        self.log.info("Tmp dir root location: \n %s", gettempdir()) # 基类继承了处理log的mixin类
        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

                f.write(bytes(bash_command, 'utf_8'))
                f.flush()   # 将缓冲区的数据写入到磁盘中
                fname = f.name
                script_location = tmp_dir + "/" + fname
                self.log.info(
                    "Temporary script location: %s",
                    script_location
                )
                def pre_exec():
                    # Restore default signal disposition and invoke setsid
                    for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'):
                        if hasattr(signal, sig):
                            signal.signal(getattr(signal, sig), signal.SIG_DFL)
                    os.setsid()
                self.log.info("Running command: %s", bash_command)
                sp = Popen(
                    ['bash', fname],
                    stdout=PIPE, stderr=STDOUT,
                    cwd=tmp_dir, env=self.env,
                    preexec_fn=pre_exec)

                self.sp = sp

                self.log.info("Output:")
                line = ''
                for line in iter(sp.stdout.readline, b''):
                    line = line.decode(self.output_encoding).strip()
                    self.log.info(line)
                sp.wait()
                self.log.info(
                    "Command exited with return code %s",
                    sp.returncode
                )

                if sp.returncode:
                    raise AirflowException("Bash command failed")

        if self.xcom_push_flag:
            return line

    def on_kill(self):
        self.log.info('Sending SIGTERM signal to bash process group')
        os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)

TemporaryDirectory:创建一个临时的目录,它使用了@contextmanager,生成了一个上下文管理器,因此它能用在with环境里。用contextmanager装饰的函数,要返回一个生成器,并且只能返回一个值

@contextmanager  # 生成一个上下文管理器
def TemporaryDirectory(suffix='', prefix=None, dir=None):
    name = mkdtemp(suffix=suffix, prefix=prefix, dir=dir)  # suffix:后缀  prefix: 前缀
    try:
        yield name    # yield 生成器 仅返回一个值
    finally:
        try:
            shutil.rmtree(name)   # 当with结束之后,删除临时目录
        except OSError as e:
            # ENOENT - no such file or directory
            if e.errno != errno.ENOENT:
                raise e                                              

NamedTemporaryFile: 创建一个临时的文件,它继承了一个类,这个类实现了__enter__, __exit__ 方法,因此能用with
pre_exec: 捕捉信号,并进行信号处理
subprocess.Popen: 具体执行command或者shell脚本

airflow的安装和配置

1.安装
virtualenv airflow
export AIRFLOW_HOME=~/airflow
source airflow/bin/activate
pip install airflow
这个过程时间有点长,airflow安装了很多依赖包,数据库同步工具alembic, orm工具sqlalchemy, flask等
2.初始化数据库
airflow默认的数据库是sqlite,如果你想具体测试airflow的功能的话,你需要指定一个真实的数据库,mysql或者postgresql
airflow initdb

3.启动服务
airflow webserver -p 8080
启动服务之后,你就可以访问127.0.0.1来访问airflow。这时整个网站是没有登录入口的,需要在配置文件里配置才可以看到用户登录界面
4.配置登录界面
airflow配置文件在主目录下,airflow.cfg
找到[webserver]这一项
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
把这两项改完之后,保存配置文件
cd /airflow python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type “help”, “copyright”, “credits” or “license” for more information.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

重启服务 airflow webserver -p 8080
5.设置一个后端
修改airflow.cfg:
executor = LocalExecutor
sql_alchemy_conn = mysql://username:password@localhost:3306/dbname
初始化数据库:
airflow initdb
6.测试airflow的scheduler
启动scheduler服务: airflow scheduler 如果定时任务还没有运行的话,重启一下服务 airflow webserver -p 8080

airflow之DAGs详解

airflow是一个描述,执行,监控工作流的平台。airflow自带了一些dags,当你启动airflow之后,就可以在网页端看到这些dags,我们也可以自己定以dag。

1.什么是DAGs
DAG是一个有向无环图,它是一个task的集合,并且定义了这些task之间的执行顺序和依赖关系。比如,一个DAG包含A,B,C,D四个任务,A先执行,只有A运行成功后B才能执行,C只有在A,B都成功的基础上才能执行,D不受约束,随时都可以执行。DAG并不关心它的组成任务所做的事情,它的任务是确保他们所做的一切都在适当的时间,或以正确的顺序进行,或者正确处理任何意外的问题。

2.什么是operators
DAG定义了一个工作流,operators定义了工作流中的每一task具体做什么事情。一个operator定义工作流中一个task,每个operator是独立执行的,不需要和其他的operator共享信息。它们可以分别在不同的机器上执行。
如果你真的需要在两个operator之间共享信息,可以使用airflow提供的Xcom功能。

airflow目前有一下几种operator:
BashOperator – executes a bash command
PythonOperator – calls an arbitrary Python function
EmailOperator – sends an email
HTTPOperator – sends an HTTP request
SqlOperator – executes a SQL command
Sensor – waits for a certain time, file, database row, S3 key, etc…

3.写一个简单的DAG
first_dag.py

# -*- coding:utf-8 -*-
import airflow
import datetime
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG

args = {
    'owner': 'test',
    'start_date': airflow.utils.dates.days_ago(1)
}   # 默认参数

dag = DAG(
    dag_id='first_dag',
    default_args=args,
    schedule_interval='*/1 * * * *',
    dagrun_timeout=datetime.timedelta(minutes=60)
)  # 创建一个DAG实例
dag.sync_to_db()     # 将dag写到数据库中

run_last = BashOperator(task_id='run_last', bash_command='echo 1', dag=dag)   # 定义一个operator,这个operator不做任何操作

for i in range(3):
    op = BashOperator(task_id='task_run_%s'%i, bash_command='ls -l', dag=dag)  # 执行 ls -l 命令
    op.set_downstream(run_last) # set_downstream set_upstream 定义operator的关系

if __name__ == "__main__":
    dag.cli()

icode 快速入门教程

简介

icode是基于gerrit的二次开发,底层的服务由gerrit提供,icode旨在解决使用gerrit过程中的痛点,优化整个开发流程,打通整体流程,包括:代码托管、持续集成、自动化运维,前端部分使用react + mobx + antd搭建,服务端使用python作为连接gerrit与前端的桥梁。

登录

icode使用gerrit账号进行登录,目前还没有开通注册功能,所以新同事可以找运维人员开通gerrit账号来使用icode。

WechatIMG48

创建代码库

点击创建代码库的按钮会出现创建代码库的弹窗,在输入框中输入需要的信息就可以直接创建对应的代码库。WechatIMG50

分支管理

点击进入项目之后便可看到项目完整的分支信息,包括:分知名、备注、创建时间、分支状态、领先落后数量等。

WechatIMG51

新建分支

点击新建分支,会出现弹窗,在输入框中填写具体的信息就可以创建新的分支,分支名默认自动生成,且不允许修改格式。

WechatIMG52

克隆代码库&切换分支

申请分支之后,我们可以先把代码库克隆到本地。

WechatIMG53

ps:想要执行这段命令需要先把ssh-key添加到gerrit账号中。

地址:http://gerrit.dev.aixuexi.com/#/settings/ssh-keys

克隆到本地之后,我们需要切换到我们新生请的分支。

使用:git checkout –track origin/[你的分支] 指令来切换到对应的远程分支,并建立本地分支与其关联。

提交代码

icode为每次提交都添加的评审,且校验了change-id,所以我们提交评审的时候需要使用hook生成并携带change-id,建议添加gpush别名来简化提交。

bash -s << '_EOF_'
git config --global alias.gpush '!f() { : push ; r=$1; [[ -z $r ]] && r=origin; b=$2; t=$(awk "{ print \$2 }" $(git rev-parse --git-dir)/HEAD); t=${t#refs/heads/}; [[ -z $b ]] && b=$t; cmd="git push $r HEAD:refs/for/$b%topic=$t"; echo $cmd; echo; $cmd; }; f'
_EOF_

代码评审

提交评审之后,在icode上实时的看到评审列表,点击评审主题就可以跳转到评审页面。

WechatIMG54

在线预览

点击文件菜单可以在线预览代码库结构以及文件内容。

WechatIMG55

提交历史

点击提交历史,可以查看每一次的提交记录,并且可以查看提交内容的diff记录。

WechatIMG56

WechatIMG57

收藏代码

WechatIMG49

持续集成

点击持续集成菜单,会弹出新的页面进入持续集成平台,在持续集成平台中,我们提供了编译、送测、发布、合并回主干的功能。

  • 编译:点击编译按钮可以编译当前分支的代码,产出服务器上需要的压缩包。
  • 送测:点击送测按钮,可以选择对应的QA,系统会自动发送邮件来提醒对应的QA人员本次提测的代码库地址和分支号。
  • 发布:测试通过之后,点击发布会把压缩包发布到产品库中等待上线操作。
  • 合并回主干:当上线结束之后,点击合并回主干,分支代码会自动合并回master分支,并且删除当前分支,分支的生命周期走到尽头。

WechatIMG58 WechatIMG59 WechatIMG60 WechatIMG62 WechatIMG63