DAG设定方法


关于任务调度的配置方法:

--在airflow中可自义参数, 如下方法可获取自定义参数或直接python生成
OPG_LOAD_DAYS = Variable.get("OPG_LOAD_DAYS") //airflow中获取
MSG = Variable.get("MSG")
report_time = datetime.datetime.now()- datetime.timedelta(days=int(OPG_LOAD_DAYS))
P_START_ZYM= report_time .strftime('%Y%m') //python代码生成的参数
###############不过以上方法不再推荐#################

//以下为推荐方法, 直接在DAG中写参数,你还可以用更灵活的参数方法
//当前时间
report_time = datetime.datetime.now()
//当前时间,往后推一天, 你可以写成小时hour=1...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
//格式化时间为字符串
时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
P_START_ZYM= report_time.strftime('%Y%m')
P_START_ZYMD= report_time.strftime('%Y%m%d')
//你也可以直接传一个参数
P_ZYM='201912'
P_ID=1230
//你也可以采用格式化整一个条件参数,格式f"xxxxxx",参数用{}包裹
PARAM_DMS_BP=f"where create_time >= '{P_START_ZYMD}' "
//你也可以直接给参数赋值
PARAM_DMS_ISB=PARAM_DMS_BP
//当你要刷数据的时候, 你可以不修改之前的参数, 下方重写直接覆盖
PARAM_DMS_BP=''

如果你需要在测试环境中运行,请输入
dev=True

如果你要注释掉一些设定, 你可以使用 '''你的内容'''

--JOB设定方法,每一个JOB为一个节点
--填写驱动名称和要执行的文件名即可,注意文件名不需要带后缀如.sql, .py....
--文件名不可以用数字开头如000xxx, 不可以使用非法字符
#impala_sql  sql_filename
--也可以向sql_file传入参数, 如
#impala_sql  sql_filename P_START_ZYM,MSG 
--也可以为每个JOB增加备注
#impala_sql  sql_filename     --我是备注, 请--就可以


--DAG的设定方法
JOB写完以下, 在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支
sql_filename1 >> sql_filename4 为另一个分支

我们目前支持的驱动有:

  • hive_sql 使用hive执行sql文件, 注意当此sql文件中存在refresh table语句时会自动调用impala来执行这条命令, 这样你无需再单独做一个refresh的job,直接放在hive的脚本中即可
  • impala_sql 使用impala执行sql文件
  • ktr 执行kettle的transform文件
  • kjb 执行kettle的job文件
  • py 执行python脚本文件
  • link DAG依赖的实现方法
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
       如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
  • validate 使用默认的数据库执行select sql来做数据校验, 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename
  • dataset 是validate的增强功能, 可以实现对任意数据库的查询与校验. 需要smartchart的支持
使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数: 可不填写,则默认无查询结果抛出错误
      info - 只打印查询结果, e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
      e3 - 无查询结果时,15分钟会重试,2小时还无结果抛出错误,
           可用于轮询业务系统, 等待业务系统数据ready后再抽取

id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx
  • refresh_tableau 刷新tableau数据源,格式:#refresh_tableau jobname sourceID, 其中jobname为自定义一个英文名做为标识, sourceID是tableau对的需要刷新的数据源ID, 需要在头部参数中定义, 如 sourceID = 'xxxx-xxxx-xxx'
  • ktr_r 执行远程kettle的transform文件
  • kjb_r 执行远程kettle的job文件
  • gp_sql 执行greenplum的sql文件
  • sql_file 使用默认的驱动执行sql文件
  • sp 使用默认的数据库执行SP名

  • sqoop 使用sqoop抽取到大数据平台

填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn =  zspl        -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename   -- 源系统表名,(必填)
columns =             -- 抽取原表的字段  a,b,c (可省略)
where =                -- 抽取时的条件  a>1 (可省略)
seq =                    -- 分隔符, 默认 \t (可省略)
query =                -- 查询语名, select * from xxx (可省略)
m =                     -- 启动map数,并行执行,需和split一起使用, 默认1(可省略)
split =                 -- 指定按字段拆分,字段为数值或时间格式, 不能有空(可省略)
otherParam =        -- 其它sqoop自定义参数 (可省略)
 */

load data inpath '/user/etl/tablename/*' overwrite into table xx.tablename;
refresh xxx.tablename

在测试过程可能会用到自定义的连接, 你可以将连接写在DAG设定中, 
SQOOP_PARA['zspl'] = '--connect jdbc:mysql://10.10.x.x:3306/xxx --username xxxxxxx --password xxxxxx'
#sqoop mysqoop
#sqoop mysqoop  P1,P2
  • sap 调用sap的rfc功能抽取数据到表, 流程如下:
1. 在上传设定中新建设定, 如名称为: mysapdata, 如果你是上传到hive, 建意起始行设为-1, 其它设为0
2. 编写mysapdata.json文件, 内容如下, 并上传到项目目录
    {
        "function": "abc", 
        "input": {
            "P1": "20201007", 
            "P2": "20201109"
        }, 
        "conn": {
            "ashost": "10.10.x.x", 
            "client": "270", 
            "sysnr": "01", 
            "user": "xxxx", 
            "passwd": "xxxxx"
        }, 
        "sep": "|"
    }
3. DAG设定中填写 #sap   mysapdata, 如你有input中的动态参数, 可以如下填写方式
    P1= '20201008'
    P2= '20201020'
    #sap   mysapdata P1,P2
    如果你想从外部传入user,passwd, 可如下设定
    sap_key = 'username   password'
    #sap mysapdata  sap_key

一个完整的sample

//获取变量,以下为python语法(option)
P_DAYS = 12
MSG = '------------------------------------------------------------'
report_time = datetime.datetime.now()- datetime.timedelta(days=int(P_DAYS))
P_START_ZYM= report_time.strftime('%Y%m')

//以下为JOB专署语法
#link  lastdag 30 3600        --每30秒轮询前置任务,超时1小时
#impala_sql  sqlfile1         --执行impala
#hive_sql    sqlfile2         --执行hive
#dataset     checkinfo  321   --数据查询,校验              --
#sp  sp1   report_time,MSG    --执行SP,传入参数
#ktr myktr   P_START_ZYM      --执行kettle ktr, 传入参数
#kjb mykjb   P_START_ZYM
#/kjb abc                     --当此job暂时不要时,可/注释

--以下为python语法(option)
//////////////
lastdag >> sqlfile1 >>  \
validate >> sp1 >> [myktr, mykjb]
lastdag >> sqlfile2

高级用法-自定义

'''
你的备注可以写在这
'''
#自定义你的执行函数
import requests
import json
def S_refresh_XX():
    response=requests.get('',verify=False)
    print(response.text)
    response=json.loads(response.text)
    if response['status']!=200:
        raise Exception(str(response))

sqlStr = '''
refresh a;
refresh b;
'''
def S_run_xx():
   run_sql_str(sqlStr,db_connect='impala',para_dict=None)



//////////////////
refresh_XX = PythonOperator(
    task_id='refresh_XX',
    python_callable=S_refresh_XX,
    dag=dag
)
refresh_XX.ui_color = 'red'
refresh_XX.driver = 'DIY'

run_xx = PythonOperator(
    task_id='run_xx',
    python_callable=S_run_xx,
    dag=dag
)
run_xx.ui_color = 'red'
run_xx.driver = 'DIY'

refresh_XX >> run_xx