分类标签归档:smartpip

smartpip任务中实现获取执行参数


airflow支持基于时间的任务回跑, 这样我们就需要使用到airflow自带的参数, 如 "execution_date"

airflow能向任务传递的参数如下:

{'conf': , 'dag': , 'dag_run':, 
'ds': '2022-07-13',
'ds_nodash': '20220713', 
'execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'inlets': [], 
'macros': ,
'next_ds': '2022-07-13',

Read more

smartpip实现按ID增量抽取


在数据抽取过程中, 如何通过目标数据库最大ID或时间等条件进行抽取, 这样可以将数据同步的粒度做得更细

标准方法

在datax的数据同步设定中加入

##incColumn = 增量字段
##incDB = 目标查询DB[默认为starrocks]

数据集方式

  • 首先我们需在smartchart的数据集开发中新建一个查询sql 例如maxid, event_day对应的参数名
select 
 max(id) as maxid, max(event_day) as event_day 
from targettablename
  • 在DAG开发中将数据集ID当做参数传递即可,假设数据集ID为

Read more

smartpip实现自定义邮件


你可能需要自定义邮件内容, 甚至可能需要动态获取数据进行发送

固定内容发送

msg = ['报表刷新成功', '<h1>刷新成功!!</h1>']
maillist = 'xxx@smartchart.cn'

#send_mail reportmail msg maillist

动态获取数据发送

maillist = 'xxx@smartchart.cn,abc@xxx.cn'
def fun_msg():
    result = get_dataset(123)['data']
  

Read more

SmartPip DAG设定方法


关于任务调度的配置方法:

自定义参数方法

-- 当前时间
report_time = datetime.datetime.now()
-- 当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
-- 获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
-- 格式化字符串时间:(%Y-

Read more