Airflow2.4.3 - 创建任务(定义Dag)、运行任务、查看执行结果 作者:马育民 • 2023-04-13 15:33 • 阅读:10326 # 介绍 本文介绍创建任务,即:定义 Dag,有向无环图 通过编写 Python 的 `.py` 文件,来定义 Dag # 查看 Dag 保存路径 查看 `airflow.cfg` 文件,如下: ``` vim /program/airflow/airflow.cfg ``` 看到下面配置,该路径就是 `.py` 文件的保存路径 ``` dags_folder = /program/airflow/dags ``` ### 查看目录 ``` cd /program/airflow ``` ``` ll ``` 发现该目录下 **没有** `dags` 目录 ### 创建 dags 目录 ``` mkdir dags ``` # 编写脚本 进入 `dags` 目录 ``` cd /program/airflow/dags ``` ### 脚本例子 创建 `demo.py` 文件: ``` vim demo.py ``` 内容如下: ``` from datetime import datetime, timedelta from airflow import DAG from airflow.utils import dates from airflow.models.baseoperator import chain from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator # 参数 def default_options(): default_args = { 'owner': 'test', # 拥有者名称,可自定义 'depends_on_past':True, # 开启任务依赖,A任务执行后,再执行B任务 'email':['65242847@qq.com'], 'start_date': dates.days_ago(1), # 第一次开始执行的时间,为 UTC 时间 'email_on_failure': False, # 出错是否发邮件报警 'retries': 1, # 失败重试次数 'retry_delay': timedelta(seconds=20), # 失败重试间隔 'email_on_retry': False # 重试是否发邮件报警 } return default_args # 定义DAG def t1(dag): cmd = "ssh hadoop1 hadoop jar /program/hadoop-3.0.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar pi 10 10" # operator 支持多种类型, 这里使用 BashOperator task = BashOperator( task_id='t1', # task 的 id bash_command=cmd, # 指定要执行的命令 retries = 1, # 失败重试次数 dag=dag # 指定归属的dag ) return task def hello_world(): # 打印 hello world at 【当前时间】 current_time = str(datetime.today()) print('hello world at {} ----'.format(current_time)) def t2(dag): # PythonOperator task = PythonOperator( # 执行python脚本 task_id='t2', python_callable=hello_world, # 指定要执行的函数 retries = 1, # 失败重试次数 dag=dag) return task def t3(dag): t = "date" # 执行linux date 命令,显示当前时间 task = BashOperator( task_id='t3', bash_command=t, retries = 1, # 失败重试次数 dag=dag) return task with DAG( 'test_task', # dag的id default_args=default_options(), # 指定默认参数 schedule=timedelta(days=1) # 执行周期,支持cron表达式,默认为00:00:00秒执行 ) as d: task1 = t1(d) task2 = t2(d) task3 = t3(d) chain(task1, task2, task3) # 指定执行顺序 # 用位位移指定任务执行顺序 # task1 >> task2 >> task3 # 通过 set_downstream() 指定执行顺序 # task1.set_downstream(task2) # task2.set_downstream(task3) # 通过 set_upstream() 指定执行顺序,注意顺序与上面不同 # task2.set_downstream(task1) # task3.set_downstream(task2) ``` 执行:`esc`,`:wq`,保存退出 **提示:**此时该dag处于可用状态 ### 检查脚本是否报错 启动 conda 环境: ``` [root@hadoop3 dags]# conda activate airflow ``` 执行脚本: ``` (airflow) [root@hadoop3 dags]# python /root/airflow/dags/demo.py ``` 没有报错信息,说明可运行 # 运行 DAG 任务 ### 启动相关大数据软件 上面的脚本用到了 hadoop,所以需要先启动 hadoop 登录 `hadoop1` 服务器,执行下面命令: ``` /program/bin/hadoop.sh start ``` ### 启动 airflow 登录 `hadoop3` 服务器,执行下面命令: ``` [root@hadoop3 ~]# /program/bin/airflow.sh start ``` ### 在 web网页中查看 dag **注意:**此方式可能会延时,创建任务、删除任务,在一段时间内没有变化,可 **重启 airflow** 访问 http://hadoop3:8080 ,看到下图说明已经可用,但没有执行: [![](/upload/0/0/1IX5JLeqNuBg.png)](/upload/0/0/1IX5JLeqNuBg.png) ### 在 命令中查看 dag **提示:**此方式实时性好,创建任务、删除任务,立刻能看到结果 ``` (airflow) [root@hadoop3 dags]# airflow dags list ``` ### 启动-定时方式 [![](/upload/0/0/1IX5JN5ut38H.png)](/upload/0/0/1IX5JN5ut38H.png) ### 启动-手动启动 [![](/upload/0/0/1IX5JN6Eh26K.png)](/upload/0/0/1IX5JN6Eh26K.png) ### 在 yarn 上查看任务 访问 http://hadoop2:8088 ,可以查看启动的任务 ### 在 airflow 查看 点击上图中的 `test_task` 任务名,显示如下图: [![](/upload/0/0/1IX5JNB0iqUh.png)](/upload/0/0/1IX5JNB0iqUh.png) ### 查看 dag 执行顺序 [![](/upload/0/0/1IX5JNBZ9Kdm.png)](/upload/0/0/1IX5JNBZ9Kdm.png) ### 切换每次执行的任务 dag任务执行多次,通过下面方式可切换: [![](/upload/0/0/1IX5JNIcqK3O.png)](/upload/0/0/1IX5JNIcqK3O.png) ### 查看 每个dag执行结果 [![](/upload/0/0/1IX5JNBzr8jp.png)](/upload/0/0/1IX5JNBzr8jp.png) [![](/upload/0/0/1IX5JNCINFZs.png)](/upload/0/0/1IX5JNCINFZs.png) [![](/upload/0/0/1IX5JNCWmMkw.png)](/upload/0/0/1IX5JNCWmMkw.png) # 修改脚本 修改脚本后,重新启动任务即可 参考: https://blog.csdn.net/sinat_28371057/article/details/109240768 https://zhuanlan.zhihu.com/p/84332879 https://blog.csdn.net/sinat_28371057/article/details/109240768 原文出处:http://malaoshi.top/show_1IX5JNDYX7Bz.html