+-
airflow 2.x 分布式部署实战(qbit)
首页 专栏 redis 文章详情
0

airflow 2.x 分布式部署实战(qbit)

qbit 发布于 5 月 11 日
English

前言

试验开始使用的 MySQL 8 作为数据库,截至 2021.5.13,airflow 2.0.2 的这个问题未解决,所以转为使用 PostgreSQL 12 airflow 是 DAG(有向无环图)的任务管理系统,简单的理解就是一个高级版的 crontab。 airflow 解决了 crontab 无法解决的任务依赖问题。 airflow 基本架构
airflow + celery 架构

环境与组件

Ubuntu 20.04 Python-3.8(Anaconda3-2020.11-Linux-x86_64) PostgreSQL 12.6 apache-airflow 2.0.2 celery 4.4.7

集群规划

安装步骤

创建账号(node0/node1/node2)

sudo useradd airflow -m -s /bin/bash sudo passwd airflow

切换账号(node0/node1/node2)

su airflow

配置 Anaconda 环境变量(node0/node1/node2)

# /home/airflow/.bashrc export PATH=/home/airflow/anaconda3/bin:$PATH

升级 pip(node0/node1/node2)

pip install pip --upgrade -i https://mirrors.aliyun.com/pypi/simple/

配置 pip 国内镜像(node0/node1/node2)

pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/

安装 airflow(node0/node1/node2)依赖项:https://airflow.apache.org/do...

# 全家桶(master) pip3 install "apache-airflow[all]~=2.0.2" # OR 选择性安装 pip3 install "apache-airflow[async,postgres,mongo,redis,rabbitmq,celery,dask]~=2.0.2"

为 airflow 添加 PATH 环境变量(node0/node1/node2)

# 在 /home/airflow/.bashrc 文件尾追加以下内容: export PATH=/home/airflow/.local/bin:$PATH

查看 airflow 版本并创建 airflow 的 HOME 目录(node0/node1/node2)

# 默认 ~/airflow 目录 airflow version

设置 Ubuntu 系统时区(node0/node1/node2)

timedatectl set-timezone Asia/Shanghai

修改 airflow 中的时区(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] # 改为 system 或 Asia/Shanghai default_timezone = system 至此,安装完毕

PostgreSQL 配置

创建数据库

CREATE DATABASE airflow_db;

创建用户

CREATE USER airflow_user WITH PASSWORD 'airflow_pass'; GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

修改 PostgreSQL 连接(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow

初始化数据库表(node0)

airflow db init 查看数据库是否初始化成功

WEB UI 登录

创建管理员用户(node0)

# 角色表: ab_role # 用户表: ab_user # 创建 Admin 角色用户 airflow users create \ --lastname user \ --firstname admin \ --username admin \ --email [email protected] \ --role Admin \ --password admin123 # 创建 Viewer 角色用户 airflow users create \ --lastname user \ --firstname view \ --username view \ --email [email protected] \ --role Viewer \ --password view123

启动 webserver(node0)

airflow webserver -p 8080 在浏览器使用创建的账号登录

配置 CeleryExecutor

celery 官方文档:https://docs.celeryproject.or... 在 RabbitMQ 上创建 airflow 账号,并分配 virtual host

修改配置文件(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] executor = CeleryExecutor [celery] broker_url = amqp://airflow:[email protected]:5672/vhost_airflow result_backend = db+postgresql://airflow:[email protected]/airflow

测试案例

创建测试脚本(/home/airflow/airflow/dags/send_msg.py)(node0/node1/node2) ,发送本机 IP 到企业微信。

# encoding: utf-8 # author: qbit # date: 2021-05-13 # summary: 发送/分配任务到任务结点 import os import time import json import psutil import requests from datetime import timedelta from airflow.utils.dates import days_ago from airflow.models import DAG from airflow.operators.python_operator import PythonOperator def GetLocalIPByPrefix(prefix): r""" 多网卡情况下,根据前缀获取IP 测试可用:Windows、Linux,Python 3.6.x,psutil 5.4.x ipv4/ipv6 地址均适用 注意如果有多个相同前缀的 ip,只随机返回一个 """ localIP = '' dic = psutil.net_if_addrs() for adapter in dic: snicList = dic[adapter] for snic in snicList: if not snic.family.name.startswith('AF_INET'): continue ip = snic.address if ip.startswith(prefix): localIP = ip return localIP def send_msg(msg='default msg', **context): r""" 发送 message 到企业微信 """ print(context) run_id = context['run_id'] nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) message = '%s\n%s\n%s_%d\n%s' % ( run_id, nowTime, GetLocalIPByPrefix('192.168.'), os.getpid(), msg) print(message) ''' 发送代码(涉及账号,本段代码隐藏) ''' default_args = { 'owner': 'qbit', # depends_on_past 是否依赖于过去。 # 如果为True,那么必须要上次的 DAG 执行成功了,这次的 DAG 才能执行。 'depends_on_past': False } with DAG(dag_id='send_msg', default_args=default_args, start_date=days_ago(1), schedule_interval=timedelta(seconds=60), # catchup 是否回补(backfill)开始时间到现在的任务 catchup=False, tags=['qbit'] ) as dag: first = PythonOperator( task_id='send_msg_1', python_callable=send_msg, op_kwargs={'msg': '111'}, provide_context=True, dag=dag, ) second = PythonOperator( task_id='send_msg_2', python_callable=send_msg, op_kwargs={'msg': '222'}, provide_context=True, dag=dag, ) third = PythonOperator( task_id='send_msg_3', python_callable=send_msg, op_kwargs={'msg': '333'}, provide_context=True, dag=dag, ) [third, first] >> second

查看 dag 信息(node0)

# 打印出所有正在活跃状态的 DAGs $ airflow dags list # 打印出 'send_msg' DAG 中所有的任务 $ airflow tasks list send_msg [2021-05-13 16:00:47,123] {dagbag.py:451} INFO - Filling up the DagBag from /home/airflow/airflow/dags send_msg_1 send_msg_2 send_msg_3 # 打印出 'send_msg' DAG 的任务层次结构 $ airflow tasks list send_msg --tree

测试单个 task(node0)

airflow tasks test send_msg send_msg_1 20210513

测试单个 dag(node0)

airflow dags test send_msg 20210513

集群测试

# node0 airflow webserver -p 8080 airflow scheduler airflow celery flower # 默认端口 5555 # node1/node2 airflow celery worker

参考文献

相关阅读:Airflow2.0.0 + Celery 集群搭建 对比 airflow 中的 CeleryExecutor 和 DaskExecutor:A Gentle Introduction To Understand Airflow Executor
本文出自 qbit snap
airflow postgresql celery rabbitmq redis
阅读 126 更新于 5 月 14 日
收藏
分享
本作品系原创, 采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
qbit snap
开箱即用,拿走不谢。
关注专栏
avatar
qbit
192 声望
260 粉丝
关注作者
0 条评论
得票数 最新
提交评论
你知道吗?

注册登录
avatar
qbit
192 声望
260 粉丝
关注作者
宣传栏
目录

前言

试验开始使用的 MySQL 8 作为数据库,截至 2021.5.13,airflow 2.0.2 的这个问题未解决,所以转为使用 PostgreSQL 12 airflow 是 DAG(有向无环图)的任务管理系统,简单的理解就是一个高级版的 crontab。 airflow 解决了 crontab 无法解决的任务依赖问题。 airflow 基本架构
airflow + celery 架构

环境与组件

Ubuntu 20.04 Python-3.8(Anaconda3-2020.11-Linux-x86_64) PostgreSQL 12.6 apache-airflow 2.0.2 celery 4.4.7

集群规划

安装步骤

创建账号(node0/node1/node2)

sudo useradd airflow -m -s /bin/bash sudo passwd airflow

切换账号(node0/node1/node2)

su airflow

配置 Anaconda 环境变量(node0/node1/node2)

# /home/airflow/.bashrc export PATH=/home/airflow/anaconda3/bin:$PATH

升级 pip(node0/node1/node2)

pip install pip --upgrade -i https://mirrors.aliyun.com/pypi/simple/

配置 pip 国内镜像(node0/node1/node2)

pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/

安装 airflow(node0/node1/node2)依赖项:https://airflow.apache.org/do...

# 全家桶(master) pip3 install "apache-airflow[all]~=2.0.2" # OR 选择性安装 pip3 install "apache-airflow[async,postgres,mongo,redis,rabbitmq,celery,dask]~=2.0.2"

为 airflow 添加 PATH 环境变量(node0/node1/node2)

# 在 /home/airflow/.bashrc 文件尾追加以下内容: export PATH=/home/airflow/.local/bin:$PATH

查看 airflow 版本并创建 airflow 的 HOME 目录(node0/node1/node2)

# 默认 ~/airflow 目录 airflow version

设置 Ubuntu 系统时区(node0/node1/node2)

timedatectl set-timezone Asia/Shanghai

修改 airflow 中的时区(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] # 改为 system 或 Asia/Shanghai default_timezone = system 至此,安装完毕

PostgreSQL 配置

创建数据库

CREATE DATABASE airflow_db;

创建用户

CREATE USER airflow_user WITH PASSWORD 'airflow_pass'; GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

修改 PostgreSQL 连接(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow

初始化数据库表(node0)

airflow db init 查看数据库是否初始化成功

WEB UI 登录

创建管理员用户(node0)

# 角色表: ab_role # 用户表: ab_user # 创建 Admin 角色用户 airflow users create \ --lastname user \ --firstname admin \ --username admin \ --email [email protected] \ --role Admin \ --password admin123 # 创建 Viewer 角色用户 airflow users create \ --lastname user \ --firstname view \ --username view \ --email [email protected] \ --role Viewer \ --password view123

启动 webserver(node0)

airflow webserver -p 8080 在浏览器使用创建的账号登录

配置 CeleryExecutor

celery 官方文档:https://docs.celeryproject.or... 在 RabbitMQ 上创建 airflow 账号,并分配 virtual host

修改配置文件(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

[core] executor = CeleryExecutor [celery] broker_url = amqp://airflow:[email protected]:5672/vhost_airflow result_backend = db+postgresql://airflow:[email protected]/airflow

测试案例

创建测试脚本(/home/airflow/airflow/dags/send_msg.py)(node0/node1/node2) ,发送本机 IP 到企业微信。

# encoding: utf-8 # author: qbit # date: 2021-05-13 # summary: 发送/分配任务到任务结点 import os import time import json import psutil import requests from datetime import timedelta from airflow.utils.dates import days_ago from airflow.models import DAG from airflow.operators.python_operator import PythonOperator def GetLocalIPByPrefix(prefix): r""" 多网卡情况下,根据前缀获取IP 测试可用:Windows、Linux,Python 3.6.x,psutil 5.4.x ipv4/ipv6 地址均适用 注意如果有多个相同前缀的 ip,只随机返回一个 """ localIP = '' dic = psutil.net_if_addrs() for adapter in dic: snicList = dic[adapter] for snic in snicList: if not snic.family.name.startswith('AF_INET'): continue ip = snic.address if ip.startswith(prefix): localIP = ip return localIP def send_msg(msg='default msg', **context): r""" 发送 message 到企业微信 """ print(context) run_id = context['run_id'] nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) message = '%s\n%s\n%s_%d\n%s' % ( run_id, nowTime, GetLocalIPByPrefix('192.168.'), os.getpid(), msg) print(message) ''' 发送代码(涉及账号,本段代码隐藏) ''' default_args = { 'owner': 'qbit', # depends_on_past 是否依赖于过去。 # 如果为True,那么必须要上次的 DAG 执行成功了,这次的 DAG 才能执行。 'depends_on_past': False } with DAG(dag_id='send_msg', default_args=default_args, start_date=days_ago(1), schedule_interval=timedelta(seconds=60), # catchup 是否回补(backfill)开始时间到现在的任务 catchup=False, tags=['qbit'] ) as dag: first = PythonOperator( task_id='send_msg_1', python_callable=send_msg, op_kwargs={'msg': '111'}, provide_context=True, dag=dag, ) second = PythonOperator( task_id='send_msg_2', python_callable=send_msg, op_kwargs={'msg': '222'}, provide_context=True, dag=dag, ) third = PythonOperator( task_id='send_msg_3', python_callable=send_msg, op_kwargs={'msg': '333'}, provide_context=True, dag=dag, ) [third, first] >> second

查看 dag 信息(node0)

# 打印出所有正在活跃状态的 DAGs $ airflow dags list # 打印出 'send_msg' DAG 中所有的任务 $ airflow tasks list send_msg [2021-05-13 16:00:47,123] {dagbag.py:451} INFO - Filling up the DagBag from /home/airflow/airflow/dags send_msg_1 send_msg_2 send_msg_3 # 打印出 'send_msg' DAG 的任务层次结构 $ airflow tasks list send_msg --tree

测试单个 task(node0)

airflow tasks test send_msg send_msg_1 20210513

测试单个 dag(node0)

airflow dags test send_msg 20210513

集群测试

# node0 airflow webserver -p 8080 airflow scheduler airflow celery flower # 默认端口 5555 # node1/node2 airflow celery worker

参考文献

相关阅读:Airflow2.0.0 + Celery 集群搭建 对比 airflow 中的 CeleryExecutor 和 DaskExecutor:A Gentle Introduction To Understand Airflow Executor
本文出自 qbit snap