smartpip任务中实现获取执行参数


airflow支持基于时间的任务回跑, 这样我们就需要使用到airflow自带的参数, 如 "execution_date"

airflow能向任务传递的参数如下:

{'conf': , 'dag': , 'dag_run':, 
'ds': '2022-07-13',
'ds_nodash': '20220713', 
'execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'inlets': [], 
'macros': ,
'next_ds': '2022-07-13',
'next_ds_nodash': '20220713', 
'next_execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'outlets': [], 
'params': {}, 
'prev_ds': '2022-07-13',
'prev_ds_nodash': '20220713',
'prev_execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'prev_execution_date_success': , 'prev_start_date_success': , 'run_id': , 'task': , 'task_instance': , 'task_instance_key_str': , 'test_mode': False, 'ti': ,
'tomorrow_ds': '2022-07-14', 
'tomorrow_ds_nodash': '20220714', 
'ts': '2022-07-13T02:04:33.244294+00:00',
'ts_nodash': '20220713T020433',
'ts_nodash_with_tz': '20220713T020433.244294+0000',
'var': {'json': None, 'value': None},
'yesterday_ds': '2022-07-12',
'yesterday_ds_nodash': '20220712',
'templates_dict': None
`

那么如何在DAG配置中使用到这个参数呢 我们以DIY方式为例

def get_execution_date(**kwargs):
    job=os.path.join(ETL_FILE_PATH,'DAG名/任务名.sql')
    execution_date = kwargs['execution_date']
    zymd = execution_date.strftime('%Y%m%d')
    para_dict = {'zymd':zymd}
    run_sql_file(job,'starrocks', para_dict)

#diy job_get_execution_date  get_execution_date