使用版本:
python 3.6.8
apache-airflow 1.10.3

安装

python 环境准备

apache-airflow 对于 python 版本限制如下: 
requires Python '>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*'

# wget 用于下载源码包, gcc 和 make 用于编译
yum install wget gcc make

# 下载 python 安装包,使用源码安装
wget https://www.python.org/ftp/python/3.6.8/Python-3.6.8.tar.xz

# 解包,解压缩
xz -d Python-3.6.8.tar.xz 
tar -xvf Python-3.6.8.tar
# 编译
cd Python-3.6.8
./configure --prefix=/usr/local/python3.6.8 --enable-optimizations --with-openssl=/usr/bin/openssl
make && make install
    
# 添加 python3.6 为环境变量 或 建立软连接
export PATH="/usr/local/python3.6.8/bin:$PATH"
sudo ln -s /usr/local/python3.6.8/bin/python3.6 /usr/bin/python3.6

# 创建一个名为 airflow_env 的虚拟环境, 并指定 python 版本
cd ~
python3.6 -m venv airflow_env
# 切换到虚拟环境
source ~/_pyenv/airflow_env/bin/activate

安装 airflow

# 由于打算使用MYSQL,要求MYSQL版本 5.6.4+
pip install 'apache-airflow[mysql]' 

ps:若安装失败
1、报错:Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-l2qrj2co/pendulum
maybe 需要升级一下 
pip install --upgrade pip
pip install --upgrade setuptools

# 需要设置一个 airflow 的家目录,
第一次运行 Airflow 时,它将创建一个 airflow.cfg 在 $AIRFLOW_HOME 目录中(默认情况下在 ~/airflow )。此文件包含 Airflow 的配置。

初始化数据库后端

使用MYSQL的前置条件

1 确保 在[mysqld]下的my.cnf中指定了explicit_defaults_for_timestamp = 1
 否则会报错:Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
 解决方法:在my.cnf中添加或修改explicit_defaults_for_timestamp=true
 
2 参考修改配置文件 ~/airflow/airflow.cfg
[core]
executor = LocalExecutor   # 代表本地任务可并行执行
sql_alchemy_conn = mysql+mysqldb://root:[email protected]:3306/airflow
ps:不能用pymysql,在看log的时候会报错 pendulum object has no attribute 'translate'.

# executor 简介
SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
LocalExecutor:多进程本地执行任务
CeleryExecutor:分布式调度,生产常用
DaskExecutor :动态任务调度,主要用于数据分析
# 建数据库
CREATE DATABASE airflow;

# 初始表结构
airflow initdb

# 重置表结构
airflow resetdb

在这里插入图片描述
数据表结构如下:
在这里插入图片描述

启动

airflow webserver -p 8081

# 启动报错
如果页面出现莫名报错:TypeError: b'80dc9b2e9f690fa57819e2f6538ec2adc788110f' is not JSON serializable
请清空你的浏览器cookies,或者用无痕模式避免

在这里插入图片描述
在这里插入图片描述

打开调度器

airflow scheduler

在这里插入图片描述

创建任务文件夹

在 ~/airflow 文件夹下创建文件夹 tags
也可以自定义文件夹,在 ~/airflow/airflow.cfg 中通过 dags_floder 指定

添加任务

airflow内置了丰富的任务处理器,用于实现不同类型的任务:
BashOperator : 执行bash命令
PythonOperator : 调用python代码
EmailOperator : 发送邮件
HTTPOperator : 发送 HTTP 请求
SqlOperator : 执行 SQL 命令
airflow 直接给出了丰富的各类任务处理器的例子,dag文件怎么写,task怎么写,灰常清楚。
举例:创建 bash 命令的任务,点击进去之后,可以清楚看到详情,dag 文件中清楚描述了执行 bash 命令的任务如何写,以及如何创建 bash 命令之间的依赖关系

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

账户安全性配置

只通过账号密码控制

在 airflow.cfg 文件中 [webserver] 下添加如下配置

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

启用密码身份验证后,需要先创建初始用户,然后其他账户才能登陆
进入 python 命令行,执行以下命名, 或者通过运行python脚本,设置 airflow 的用户名和密码,若提示缺失包,直接通过 pip 安装即可,
用户信息会存入 users 表中

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'new_user_name'
user.email = '[email protected]'
user.password = 'set_the_password'
session = settings.Session()
session.add(user)
session.commit()
session.close()

通过账号密码➕角色权限控制来登陆

在 airflow.cfg 文件中 [webserver] 下添加如下配置

[webserver]
security = Flask AppBuilder
secure_mode = True
rbac=True
ps: 和第一种方式不共存,必须删除 authenticate 和 auth_backend 的配置

添加配置之后,需要重建数据库表:
airflow resetdb

这种情况下,创建用户必须使用命令行 airflow create_user
例如:
airflow create_user --lastname user --firstname admin --username admin --email [email protected] --role Admin --password admin123
airflow create_user --lastname user --firstname view --username view --email [email protected] --role Viewer --password view123

此时, admin 角色的用户 UI 界面会出现 Security 的 Tab, 就可以愉快的通过 UI 界面来添加/修改用户了
在这里插入图片描述

nginx 配置

airflow.cfg 配置如下:
[webserver]
base_url = http://127.0.0.1:8080
web_server_host = 127.0.0.1
web_server_port = 8080

nginx应配置如下:
server {
  listen 80;
  server_name your_host;

  location / {
      proxy_pass http://127.0.0.1:8080;
      proxy_set_header Host $host;
      proxy_redirect off;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "upgrade";
  }
}

ps: nginx reload 的时候记得也要指定conf文件地址

supervisor 配置

[group:airflow]
programs=airflow_webserver,airflow_scheduler

[program:airflow_webserver]
command= /home/deploy/_pyenv/airflow_env/bin/airflow webserver ;
process_name=%(program_name)s_%(process_num)d ;
stdout_logfile=/home/deploy/_log/supervisord/airflow/airflow_webserver.out ;
stderr_logfile=/home/deploy/_log/supervisord/airflow/airflow_webserver.err ;
loglevel=info ;
numprocs=1 ;
numprocs_start=8080 ;
user=deploy ;
environment=AIRFLOW_HOME="~/airflow",PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s" ;

[program:airflow_scheduler]
command= /home/deploy/_pyenv/airflow_env/bin/airflow scheduler ;
process_name=%(program_name)s_%(process_num)d ;
stdout_logfile=/home/deploy/_log/supervisord/airflow/airflow_scheduler.out ;
stderr_logfile=/home/deploy/_log/supervisord/airflow/airflow_scheduler.err ;
loglevel=info ;
numprocs=1 ;
numprocs_start=8001 ;
user=deploy ;
environment=AIRFLOW_HOME="~/airflow",PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s" ;

其他设置

airflow.cfg 中都有每个配置的详细说明,可以都看一下,以下是一些常用配置:

不加载example dag

修改 airflow.cfg 配置

load_examples = False  # 这个配置只有在第一次启动airflow之前设置才有效

如果这个方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。

修改检测新dag间隔

最好还是修改一下,因为默认为0,没有时间间隔, 很耗资源。

[scheduler]
min_file_process_interval = 10

时区

(1) log 之类的时区
时区默认为 utc ,如果有需要更改,可以在 airflow.cfg 文件中 [core] 进行配置,比如:修改为 亚洲/上海,这时候我们可以发现 log 之类的都已经被修改为了上海时间

[core]
default_timezone = Asia/Shanghai

(2)定时任务的时区问题
需要在 dag 的 arg 中 start_date 指定时区,这样定时任务就会按照上海时间来执行

import pendulum

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

local_tz = pendulum.timezone("Asia/Shanghai")

args = {
    'owner': 'Airflow',
    'start_date': datetime(2019, 1, 1, tzinfo=local_tz),
}

dag = DAG(
    dag_id='bash_test',
    default_args=args,
    schedule_interval='* * * * *',
)

(3) UI 管理页面右上角的时间
目前官方不支持修改,实在想改,可以直接改源码

使用 celery

安装准备:
	安装 pip install 'apache-airflow[celery]'
	安装 rabbitmq brew install rabbitmq

airflow.cfg 配置准备:
	 应根据自己的情况添加配置,举例如下:
	[core]
	executor = CeleryExecutor
	
	[celery]
	celery_app_name = airflow.executors.celery_executor
	broker_url = amqp://root:[email protected]:5672/airflow
	result_backend = db+mysql://root:[email protected]:3306/airflow

添加mq的vhost
	rabbitmqctl add_vhost airflow 
界面http://127.0.0.1:15672添加用户 默认用户名密码 guest guest
root rabbitmq 给root用户配置权限 set permission set topic permission

启动:
启动 rabbitmq : rabbitmq-server
启动 worker: airflow worker

遇到问题

1)报错:python3.7 No module named _ssl

 解决:
   第一步:安装最新的openssl
   wget http://www.openssl.org/source/openssl-1.1.1.tar.gz
   tar -zxvf openssl-1.1.1.tar.gz
   cd openssl-1.1.1
   ./config --prefix=$HOME/openssl shared zlib
   make && make install

   第二步:重新装python 编译
   ps: openssl配置是用config,而不是configure,另外openssl编译安装依赖zlib动态库,所以一定要shared zlib

(2)查看duration时报错:TypeError: unsupported operand type(s) for +=: ‘int’ and ‘NoneType’
原因:task_fail 表中失败任务 duration 字段为 null
解决:UPDATE task_fail SET duration=0 WHERE duration IS NULL;

(3)Airflow 404 = lots of circles
原因:要么是配置文件错了,要么是请求的 airflow 网址错了

(4)运行 airflow scheduler 出现 {sqlalchemy.py:81} WARNING - DB connection invalidated. Reconnecting…
这个错误并不影响 airflow 正常执行任务

(5) supervisor 启动 webserver 时报错

   No module named 'airflow'
   No module named 'airflow.www'
   No module named 'airflow.www.gunicorn_config'
   FileNotFoundError: [Errno 2] No such file or directory: 'gunicorn': 'gunicorn'
   原因:环境错误
   解决:如果是用 supervisor 启动,需要添加环境变量  PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s"