建设网站建设目的意义,安卓中文开发工具,网站ppt缩略图,重庆建设工程交易中心官网#x1f3e1; 个人主页#xff1a;IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 #x1f6a9; 私聊博主#xff1a;加入大数据技术讨论群聊#xff0c;获取更多大数据资料。 #x1f514; 博主个人B栈地址#xff1a;豹哥教你大数据的个人空间-豹… 个人主页IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 私聊博主加入大数据技术讨论群聊获取更多大数据资料。 博主个人B栈地址豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 在Airflow的工作计划中一个重要的概念就是catchup追赶在实现DAG具体逻辑后如果将catchup设置为True默认就为True,Airflow将“回填”所有过去的DAG run如果将catchup设置为False,Airflow将从最新的DAG run时刻前一时刻开始执行 DAG run忽略之前所有的记录。
例如现在某个DAG每隔1分钟执行一次调度开始时间为2001-01-01 当前日期为2021-10-01 15:23:21如果catchup设置为True那么DAG将从2001-01-01 00:00:00 开始每分钟都会运行当前DAG。如果catchup 设置为False那么DAG将从2021-10-01 15:22:20当前2021-10-01 15:23:21前一时刻开始执行DAG run。
举例有first ,second,third三个shell命令任务按照顺序调度每隔1分钟执行一次首次执行时间为2000-01-01。
设置catchup 为True默认DAG python配置如下
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args {owner: airflow, # 拥有者名称start_date: datetime(2001, 1, 1), # 第一次开始执行的时间为 UTC 时间retries: 1, # 失败重试次数retry_delay: timedelta(minutes5), # 失败重试间隔
}
dag DAG(dag_id catchup_test1 , #DAG id ,必须完全由字母、数字、下划线组成default_args default_args, #外部定义的 dic 格式的参数schedule_interval timedelta(minutes1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒catchupTrue # 执行DAG时将开始时间到目前所有该执行的任务都执行默认为True
)first BashOperator(task_idfirst,bash_commandecho run first task,dagdag
)
middle BashOperator(task_idsecond,bash_commandecho run second task,dagdag
)
last BashOperator(task_idthird,bash_commandecho run third task,dagdag,retries3
)
first middle last
上传python配置文件到$AIRFLOW_HOME/dags下重启airflow,DAG执行调度如下 设置catchup 为False,DAG python配置如下
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedeltadefault_args {owner: airflow, # 拥有者名称start_date: datetime(2001, 1, 1), # 第一次开始执行的时间为 UTC 时间retries: 1, # 失败重试次数retry_delay: timedelta(minutes5), # 失败重试间隔
}
dag DAG(dag_id catchup_test2, #DAG id ,必须完全由字母、数字、下划线组成default_args default_args, #外部定义的 dic 格式的参数schedule_interval timedelta(minutes1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒catchupFalse # 执行DAG时将开始时间到目前所有该执行的任务都执行默认为True
)first BashOperator(task_idfirst,bash_commandecho run first task,dagdag
)
middle BashOperator(task_idsecond,bash_commandecho run second task,dagdag
)
last BashOperator(task_idthird,bash_commandecho run third task,dagdag,retries3
)
first middle last
上传python配置文件到$AIRFLOW_HOME/dags下重启airflow,DAG执行调度如下 有两种方式在Airflow中配置catchup:
全局配置
在airflow配置文件airflow.cfg的scheduler部分下设置catchup_by_defaultTrue默认或False这个设置是全局性的设置。
DAG文件配置
在python代码配置中设置DAG对象的参数dag.catchupTrue或False。
dag DAG(dag_id myairflow_execute_bash,default_args default_args, catchupFalse,schedule_interval timedelta(days1))