airflow 使用心得,从环境到部署上线
使用版本:
python 3.6.8
apache-airflow 1.10.3
文章目录
安装
python 环境准备
apache-airflow 对于 python 版本限制如下:
requires Python '>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*'
# wget 用于下载源码包, gcc 和 make 用于编译
yum install wget gcc make
# 下载 python 安装包,使用源码安装
wget https://www.python.org/ftp/python/3.6.8/Python-3.6.8.tar.xz
# 解包,解压缩
xz -d Python-3.6.8.tar.xz
tar -xvf Python-3.6.8.tar
# 编译
cd Python-3.6.8
./configure --prefix=/usr/local/python3.6.8 --enable-optimizations --with-openssl=/usr/bin/openssl
make && make install
# 添加 python3.6 为环境变量 或 建立软连接
export PATH="/usr/local/python3.6.8/bin:$PATH"
sudo ln -s /usr/local/python3.6.8/bin/python3.6 /usr/bin/python3.6
# 创建一个名为 airflow_env 的虚拟环境, 并指定 python 版本
cd ~
python3.6 -m venv airflow_env
# 切换到虚拟环境
source ~/_pyenv/airflow_env/bin/activate
安装 airflow
# 由于打算使用MYSQL,要求MYSQL版本 5.6.4+
pip install 'apache-airflow[mysql]'
ps:若安装失败
1、报错:Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-l2qrj2co/pendulum
maybe 需要升级一下
pip install --upgrade pip
pip install --upgrade setuptools
# 需要设置一个 airflow 的家目录,
第一次运行 Airflow 时,它将创建一个 airflow.cfg 在 $AIRFLOW_HOME 目录中(默认情况下在 ~/airflow )。此文件包含 Airflow 的配置。
初始化数据库后端
使用MYSQL的前置条件
1 确保 在[mysqld]下的my.cnf中指定了explicit_defaults_for_timestamp = 1
否则会报错:Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
解决方法:在my.cnf中添加或修改explicit_defaults_for_timestamp=true
2 参考修改配置文件 ~/airflow/airflow.cfg
[core]
executor = LocalExecutor # 代表本地任务可并行执行
sql_alchemy_conn = mysql+mysqldb://root:[email protected]:3306/airflow
ps:不能用pymysql,在看log的时候会报错 pendulum object has no attribute 'translate'.
# executor 简介
SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
LocalExecutor:多进程本地执行任务
CeleryExecutor:分布式调度,生产常用
DaskExecutor :动态任务调度,主要用于数据分析
# 建数据库
CREATE DATABASE airflow;
# 初始表结构
airflow initdb
# 重置表结构
airflow resetdb
数据表结构如下:
启动
airflow webserver -p 8081
# 启动报错
如果页面出现莫名报错:TypeError: b'80dc9b2e9f690fa57819e2f6538ec2adc788110f' is not JSON serializable
请清空你的浏览器cookies,或者用无痕模式避免
打开调度器
airflow scheduler
创建任务文件夹
在 ~/airflow 文件夹下创建文件夹 tags
也可以自定义文件夹,在 ~/airflow/airflow.cfg 中通过 dags_floder 指定
添加任务
airflow内置了丰富的任务处理器,用于实现不同类型的任务:
BashOperator : 执行bash命令
PythonOperator : 调用python代码
EmailOperator : 发送邮件
HTTPOperator : 发送 HTTP 请求
SqlOperator : 执行 SQL 命令
airflow 直接给出了丰富的各类任务处理器的例子,dag文件怎么写,task怎么写,灰常清楚。
举例:创建 bash 命令的任务,点击进去之后,可以清楚看到详情,dag 文件中清楚描述了执行 bash 命令的任务如何写,以及如何创建 bash 命令之间的依赖关系
账户安全性配置
只通过账号密码控制
在 airflow.cfg 文件中 [webserver] 下添加如下配置
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
启用密码身份验证后,需要先创建初始用户,然后其他账户才能登陆
进入 python 命令行,执行以下命名, 或者通过运行python脚本,设置 airflow 的用户名和密码,若提示缺失包,直接通过 pip 安装即可,
用户信息会存入 users 表中
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'new_user_name'
user.email = '[email protected]'
user.password = 'set_the_password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
通过账号密码➕角色权限控制来登陆
在 airflow.cfg 文件中 [webserver] 下添加如下配置
[webserver]
security = Flask AppBuilder
secure_mode = True
rbac=True
ps: 和第一种方式不共存,必须删除 authenticate 和 auth_backend 的配置
添加配置之后,需要重建数据库表:
airflow resetdb
这种情况下,创建用户必须使用命令行 airflow create_user
例如:
airflow create_user --lastname user --firstname admin --username admin --email [email protected] --role Admin --password admin123
airflow create_user --lastname user --firstname view --username view --email [email protected] --role Viewer --password view123
此时, admin 角色的用户 UI 界面会出现 Security 的 Tab, 就可以愉快的通过 UI 界面来添加/修改用户了
nginx 配置
airflow.cfg 配置如下:
[webserver]
base_url = http://127.0.0.1:8080
web_server_host = 127.0.0.1
web_server_port = 8080
nginx应配置如下:
server {
listen 80;
server_name your_host;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
ps: nginx reload 的时候记得也要指定conf文件地址
supervisor 配置
[group:airflow]
programs=airflow_webserver,airflow_scheduler
[program:airflow_webserver]
command= /home/deploy/_pyenv/airflow_env/bin/airflow webserver ;
process_name=%(program_name)s_%(process_num)d ;
stdout_logfile=/home/deploy/_log/supervisord/airflow/airflow_webserver.out ;
stderr_logfile=/home/deploy/_log/supervisord/airflow/airflow_webserver.err ;
loglevel=info ;
numprocs=1 ;
numprocs_start=8080 ;
user=deploy ;
environment=AIRFLOW_HOME="~/airflow",PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s" ;
[program:airflow_scheduler]
command= /home/deploy/_pyenv/airflow_env/bin/airflow scheduler ;
process_name=%(program_name)s_%(process_num)d ;
stdout_logfile=/home/deploy/_log/supervisord/airflow/airflow_scheduler.out ;
stderr_logfile=/home/deploy/_log/supervisord/airflow/airflow_scheduler.err ;
loglevel=info ;
numprocs=1 ;
numprocs_start=8001 ;
user=deploy ;
environment=AIRFLOW_HOME="~/airflow",PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s" ;
其他设置
airflow.cfg 中都有每个配置的详细说明,可以都看一下,以下是一些常用配置:
不加载example dag
修改 airflow.cfg 配置
load_examples = False # 这个配置只有在第一次启动airflow之前设置才有效
如果这个方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。
修改检测新dag间隔
最好还是修改一下,因为默认为0,没有时间间隔, 很耗资源。
[scheduler]
min_file_process_interval = 10
时区
(1) log 之类的时区
时区默认为 utc ,如果有需要更改,可以在 airflow.cfg 文件中 [core] 进行配置,比如:修改为 亚洲/上海,这时候我们可以发现 log 之类的都已经被修改为了上海时间
[core]
default_timezone = Asia/Shanghai
(2)定时任务的时区问题
需要在 dag 的 arg 中 start_date 指定时区,这样定时任务就会按照上海时间来执行
import pendulum
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
local_tz = pendulum.timezone("Asia/Shanghai")
args = {
'owner': 'Airflow',
'start_date': datetime(2019, 1, 1, tzinfo=local_tz),
}
dag = DAG(
dag_id='bash_test',
default_args=args,
schedule_interval='* * * * *',
)
(3) UI 管理页面右上角的时间
目前官方不支持修改,实在想改,可以直接改源码
使用 celery
安装准备:
安装 pip install 'apache-airflow[celery]'
安装 rabbitmq brew install rabbitmq
airflow.cfg 配置准备:
应根据自己的情况添加配置,举例如下:
[core]
executor = CeleryExecutor
[celery]
celery_app_name = airflow.executors.celery_executor
broker_url = amqp://root:[email protected]:5672/airflow
result_backend = db+mysql://root:[email protected]:3306/airflow
添加mq的vhost
rabbitmqctl add_vhost airflow
界面http://127.0.0.1:15672添加用户 默认用户名密码 guest guest
root rabbitmq 给root用户配置权限 set permission set topic permission
启动:
启动 rabbitmq : rabbitmq-server
启动 worker: airflow worker
遇到问题
1)报错:python3.7 No module named _ssl
解决:
第一步:安装最新的openssl
wget http://www.openssl.org/source/openssl-1.1.1.tar.gz
tar -zxvf openssl-1.1.1.tar.gz
cd openssl-1.1.1
./config --prefix=$HOME/openssl shared zlib
make && make install
第二步:重新装python 编译
ps: openssl配置是用config,而不是configure,另外openssl编译安装依赖zlib动态库,所以一定要shared zlib
(2)查看duration时报错:TypeError: unsupported operand type(s) for +=: ‘int’ and ‘NoneType’
原因:task_fail 表中失败任务 duration 字段为 null
解决:UPDATE task_fail SET duration=0 WHERE duration IS NULL;
(3)Airflow 404 = lots of circles
原因:要么是配置文件错了,要么是请求的 airflow 网址错了
(4)运行 airflow scheduler 出现 {sqlalchemy.py:81} WARNING - DB connection invalidated. Reconnecting…
这个错误并不影响 airflow 正常执行任务
(5) supervisor 启动 webserver 时报错
No module named 'airflow'
No module named 'airflow.www'
No module named 'airflow.www.gunicorn_config'
FileNotFoundError: [Errno 2] No such file or directory: 'gunicorn': 'gunicorn'
原因:环境错误
解决:如果是用 supervisor 启动,需要添加环境变量 PATH="/home/deploy/_pyenv/airflow_env/bin:%(ENV_PATH)s"