介绍
本文介绍创建任务,即:定义 Dag,有向无环图
通过编写 Python 的 .py
文件,来定义 Dag
查看 Dag 保存路径
查看 airflow.cfg
文件,如下:
vim /program/airflow/airflow.cfg
看到下面配置,该路径就是 .py
文件的保存路径
dags_folder = /program/airflow/dags
查看目录
cd /program/airflow
ll
发现该目录下 没有 dags
目录
创建 dags 目录
mkdir dags
编写脚本
进入 dags
目录
cd /program/airflow/dags
脚本例子
创建 demo.py
文件:
vim demo.py
内容如下:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# 参数
def default_options():
default_args = {
'owner': 'test', # 拥有者名称,可自定义
'depends_on_past':True, # 开启任务依赖,A任务执行后,再执行B任务
'email':['65242847@qq.com'],
'start_date': dates.days_ago(1), # 第一次开始执行的时间,为 UTC 时间
'email_on_failure': False, # 出错是否发邮件报警
'retries': 1, # 失败重试次数
'retry_delay': timedelta(seconds=20), # 失败重试间隔
'email_on_retry': False # 重试是否发邮件报警
}
return default_args
# 定义DAG
def t1(dag):
cmd = "ssh hadoop1 hadoop jar /program/hadoop-3.0.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar pi 10 10"
# operator 支持多种类型, 这里使用 BashOperator
task = BashOperator(
task_id='t1', # task 的 id
bash_command=cmd, # 指定要执行的命令
retries = 1, # 失败重试次数
dag=dag # 指定归属的dag
)
return task
def hello_world():
# 打印 hello world at 【当前时间】
current_time = str(datetime.today())
print('hello world at {} ----'.format(current_time))
def t2(dag):
# PythonOperator
task = PythonOperator( # 执行python脚本
task_id='t2',
python_callable=hello_world, # 指定要执行的函数
retries = 1, # 失败重试次数
dag=dag)
return task
def t3(dag):
t = "date" # 执行linux date 命令,显示当前时间
task = BashOperator(
task_id='t3',
bash_command=t,
retries = 1, # 失败重试次数
dag=dag)
return task
with DAG(
'test_task', # dag的id
default_args=default_options(), # 指定默认参数
schedule=timedelta(days=1) # 执行周期,支持cron表达式,默认为00:00:00秒执行
) as d:
task1 = t1(d)
task2 = t2(d)
task3 = t3(d)
chain(task1, task2, task3) # 指定执行顺序
# 用位位移指定任务执行顺序
# task1 >> task2 >> task3
# 通过 set_downstream() 指定执行顺序
# task1.set_downstream(task2)
# task2.set_downstream(task3)
# 通过 set_upstream() 指定执行顺序,注意顺序与上面不同
# task2.set_downstream(task1)
# task3.set_downstream(task2)
执行:esc
,:wq
,保存退出
提示:此时该dag处于可用状态
检查脚本是否报错
启动 conda 环境:
[root@hadoop3 dags]# conda activate airflow
执行脚本:
(airflow) [root@hadoop3 dags]# python /root/airflow/dags/demo.py
没有报错信息,说明可运行
运行 DAG 任务
启动相关大数据软件
上面的脚本用到了 hadoop,所以需要先启动 hadoop
登录 hadoop1
服务器,执行下面命令:
/program/bin/hadoop.sh start
启动 airflow
登录 hadoop3
服务器,执行下面命令:
[root@hadoop3 ~]# /program/bin/airflow.sh start
在 web网页中查看 dag
注意:此方式可能会延时,创建任务、删除任务,在一段时间内没有变化,可 重启 airflow
访问 http://hadoop3:8080 ,看到下图说明已经可用,但没有执行:
在 命令中查看 dag
提示:此方式实时性好,创建任务、删除任务,立刻能看到结果
(airflow) [root@hadoop3 dags]# airflow dags list
启动-定时方式
启动-手动启动
在 yarn 上查看任务
访问 http://hadoop2:8088 ,可以查看启动的任务
在 airflow 查看
点击上图中的 test_task
任务名,显示如下图:
查看 dag 执行顺序
切换每次执行的任务
dag任务执行多次,通过下面方式可切换:
查看 每个dag执行结果
修改脚本
修改脚本后,重新启动任务即可
参考:
https://blog.csdn.net/sinat_28371057/article/details/109240768
https://zhuanlan.zhihu.com/p/84332879
https://blog.csdn.net/sinat_28371057/article/details/109240768