DAG设定方法


关于任务调度的配置方法:

--在airflow中可自义参数, 如下方法可获取自定义参数或直接python生成
OPG_LOAD_DAYS = Variable.get("OPG_LOAD_DAYS") //airflow中获取
MSG = Variable.get("MSG")
report_time = datetime.datetime.now()- datetime.timedelta(days=int(OPG_LOAD_DAYS))
P_START_ZYM= report_time .strftime('%Y%m') //python代码生成的参数
###############不过以上方法不再推荐#################

//以下为推荐方法, 直接在DAG中写参数,你还可以用更灵活的参数方法
//当前时间
report_time = datetime.datetime.now()
//当前时间,往后推一天, 你可以写成小时hour=1...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
//格式化时间为字符串
时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
P_START_ZYM= report_time.strftime('%Y%m')
P_START_ZYMD= report_time.strftime('%Y%m%d')
//你也可以直接传一个参数
P_ZYM='201912'
P_ID=1230
//你也可以采用格式化整一个条件参数,格式f"xxxxxx",参数用{}包裹
PARAM_DMS_BP=f"where create_time >= '{P_START_ZYMD}' "
//你也可以直接给参数赋值
PARAM_DMS_ISB=PARAM_DMS_BP
//当你要刷数据的时候, 你可以不修改之前的参数, 下方重写直接覆盖
PARAM_DMS_BP=''

如果你需要在测试环境中运行,请输入
dev=True

如果你要注释掉一些设定, 你可以使用 '''你的内容'''

--JOB设定方法,每一个JOB为一个节点
--填写驱动名称和要执行的文件名即可,注意文件名不需要带后缀如.sql, .py....
--文件名不可以用数字开头如000xxx, 不可以使用非法字符
#impala_sql  sql_filename
--也可以向sql_file传入参数, 如
#impala_sql  sql_filename P_START_ZYM,MSG 
--也可以为每个JOB增加备注
#impala_sql  sql_filename     --我是备注, 请--就可以


--DAG的设定方法
JOB写完以下, 在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支
sql_filename1 >> sql_filename4 为另一个分支

我们目前支持的驱动有:

  • hive_sql 使用hive执行sql文件, 注意当此sql文件中存在refresh table语句时会自动调用impala来执行这条命令, 这样你无需再单独做一个refresh的job,直接放在hive的脚本中即可
  • impala_sql 使用impala执行sql文件
  • ktr 执行kettle的transform文件
  • kjb 执行kettle的job文件
  • py 执行python脚本文件
  • link DAG依赖的实现方法
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
       如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
  • validate 使用默认的数据库执行select sql来做数据校验, 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename
  • dataset 是validate的增强功能, 可以实现对任意数据库的查询与校验. 需要smartchart的支持
使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数: 可不填写,则默认无查询结果抛出错误
      info - 只打印查询结果, e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
      e3 - 无查询结果时,15分钟会重试,2小时还无结果抛出错误,
           可用于轮询业务系统, 等待业务系统数据ready后再抽取

id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx
  • refresh_tableau 刷新tableau数据源,格式:#refresh_tableau jobname sourceID, 其中jobname为自定义一个英文名做为标识, sourceID是tableau对的需要刷新的数据源ID, 需要在头部参数中定义, 如 sourceID = 'xxxx-xxxx-xxx'
  • ktr_r 执行远程kettle的transform文件
  • kjb_r 执行远程kettle的job文件
  • gp_sql 执行greenplum的sql文件
  • sql_file 使用默认的驱动执行sql文件
  • sp 使用默认的数据库执行SP名

一个完整的sample

//获取变量,以下为python语法(option)
P_DAYS = 12
MSG = '------------------------------------------------------------'
report_time = datetime.datetime.now()- datetime.timedelta(days=int(P_DAYS))
P_START_ZYM= report_time.strftime('%Y%m')

//以下为JOB专署语法
#link  lastdag 30 3600        --每30秒轮询前置任务,超时1小时
#impala_sql  sqlfile1         --执行impala
#hive_sql    sqlfile2         --执行hive
#dataset     checkinfo  321   --数据查询,校验              --
#sp  sp1   report_time,MSG    --执行SP,传入参数
#ktr myktr   P_START_ZYM      --执行kettle ktr, 传入参数
#kjb mykjb   P_START_ZYM
#/kjb abc                     --当此job暂时不要时,可/注释

--以下为python语法(option)
//////////////
lastdag >> sqlfile1 >>  \
validate >> sp1 >> [myktr, mykjb]
lastdag >> sqlfile2

高级用法-自定义

'''
你的备注可以写在这
'''
#自定义你的执行函数
import requests
import json
def S_refresh_XX():
    response=requests.get('',verify=False)
    print(response.text)
    response=json.loads(response.text)
    if response['status']!=200:
        raise Exception(str(response))

sqlStr = '''
refresh a;
refresh b;
'''
def S_run_xx():
   run_sql_str(sqlStr,db_connect='impala',para_dict=None)



//////////////////
refresh_XX = PythonOperator(
    task_id='refresh_XX',
    python_callable=S_refresh_XX,
    dag=dag
)
refresh_XX.ui_color = 'red'
refresh_XX.driver = 'DIY'

run_xx = PythonOperator(
    task_id='run_xx',
    python_callable=S_run_xx,
    dag=dag
)
run_xx.ui_color = 'red'
run_xx.driver = 'DIY'

refresh_XX >> run_xx