Airflow2.4.3 - 创建任务(定义Dag)、运行任务、查看执行结果

介绍

本文介绍创建任务,即:定义 Dag,有向无环图

通过编写 Python 的 .py 文件,来定义 Dag

查看 Dag 保存路径

查看 airflow.cfg 文件,如下:

vim /program/airflow/airflow.cfg

看到下面配置,该路径就是 .py 文件的保存路径

dags_folder = /program/airflow/dags

查看目录

cd /program/airflow
ll

发现该目录下 没有 dags 目录

创建 dags 目录

mkdir dags

编写脚本

进入 dags 目录

cd /program/airflow/dags

脚本例子

创建 demo.py 文件:

vim demo.py

内容如下:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils import dates
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# 参数
def default_options():
    default_args = {
        'owner': 'test',  # 拥有者名称,可自定义
        'depends_on_past':True, # 开启任务依赖,A任务执行后,再执行B任务
        'email':['65242847@qq.com'],
        'start_date': dates.days_ago(1),  # 第一次开始执行的时间,为 UTC 时间
        'email_on_failure': False, # 出错是否发邮件报警
        'retries': 1,  # 失败重试次数
        'retry_delay': timedelta(seconds=20),  # 失败重试间隔
        'email_on_retry': False # 重试是否发邮件报警
    }
    return default_args


# 定义DAG
def t1(dag):
    cmd = "ssh hadoop1 hadoop jar /program/hadoop-3.0.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar pi 10 10"
    # operator 支持多种类型, 这里使用 BashOperator
    task = BashOperator(
        task_id='t1',  # task 的 id
        bash_command=cmd,  # 指定要执行的命令
        retries = 1,  # 失败重试次数
        dag=dag  # 指定归属的dag
    )
    return task


def hello_world():
    # 打印 hello world at 【当前时间】
    current_time = str(datetime.today())
    print('hello world at {} ----'.format(current_time))


def t2(dag):
    # PythonOperator
    task = PythonOperator( # 执行python脚本
        task_id='t2',
        python_callable=hello_world,  # 指定要执行的函数
        retries = 1,  # 失败重试次数
        dag=dag)
    return task


def t3(dag):
    t = "date" # 执行linux date 命令,显示当前时间
    task = BashOperator(
        task_id='t3',
        bash_command=t,
        retries = 1,  # 失败重试次数
        dag=dag)
    return task


with DAG(
        'test_task',  # dag的id
        default_args=default_options(),  # 指定默认参数
        schedule=timedelta(days=1)  # 执行周期,支持cron表达式,默认为00:00:00秒执行
) as d:
    task1 = t1(d)
    task2 = t2(d)
    task3 = t3(d)
    chain(task1, task2, task3)  # 指定执行顺序
    # 用位位移指定任务执行顺序
    # task1 >> task2 >> task3
    # 通过 set_downstream() 指定执行顺序
    # task1.set_downstream(task2)
    # task2.set_downstream(task3)
    # 通过 set_upstream() 指定执行顺序,注意顺序与上面不同
    # task2.set_downstream(task1)
    # task3.set_downstream(task2)

执行:esc:wq,保存退出

提示:此时该dag处于可用状态

检查脚本是否报错

启动 conda 环境:

[root@hadoop3 dags]# conda activate airflow

执行脚本:

(airflow) [root@hadoop3 dags]# python /root/airflow/dags/demo.py

没有报错信息,说明可运行

运行 DAG 任务

启动相关大数据软件

上面的脚本用到了 hadoop,所以需要先启动 hadoop

登录 hadoop1 服务器,执行下面命令:

/program/bin/hadoop.sh start

启动 airflow

登录 hadoop3 服务器,执行下面命令:

[root@hadoop3 ~]# /program/bin/airflow.sh start

在 web网页中查看 dag

注意:此方式可能会延时,创建任务、删除任务,在一段时间内没有变化,可 重启 airflow

访问 http://hadoop3:8080 ,看到下图说明已经可用,但没有执行:

在 命令中查看 dag

提示:此方式实时性好,创建任务、删除任务,立刻能看到结果

(airflow) [root@hadoop3 dags]# airflow dags list

启动-定时方式

启动-手动启动

在 yarn 上查看任务

访问 http://hadoop2:8088 ,可以查看启动的任务

在 airflow 查看

点击上图中的 test_task 任务名,显示如下图:

查看 dag 执行顺序

切换每次执行的任务

dag任务执行多次,通过下面方式可切换:

查看 每个dag执行结果

修改脚本

修改脚本后,重新启动任务即可

参考:
https://blog.csdn.net/sinat_28371057/article/details/109240768
https://zhuanlan.zhihu.com/p/84332879
https://blog.csdn.net/sinat_28371057/article/details/109240768


原文出处:https://malaoshi.top/show_1IX5JNDYX7Bz.html