分类目录归档:smartpip

smartpip调用API


smartpip支持所有Airflow的API接口,为了更方便使用可以采用smartchart的API服务功能 以下以触发调度为例

  • 在smartchart数据集中新建一个python数据源的数据集
  • 填写如下代码

    from etl.smartpip import get_auth_hearder
    import requests
    header=get_auth_hearder()
    url='{url}/api/v1/dags/{dag_id}/dagRuns'
    ds=requests.post(url=url,headers=header,json={"conf&quo

Read more

smartpip常用自定义函数


功能说明 函数说明 备注
文件上传 smart_upload(csvfilepath)
获取数据 get_dataset(id, param={})
发送邮件 dash_mail(标题, 内容, 邮件列表)
执行shell run_bash(cmdStr)
执行datax run_datax('DAG名/任务名', para_dict)
执行sql run_sql_file('DAG名/任务名', db_connect='starrocks', para_dict=None)
执行kettle run_kettle('DAG名/任务名.ktr',

Read more

smartpip API数据接入


定义接入标准function

假设API的返回格式为:
{'data':[{'a':1,'b':2},..]}
def _get_api_data(param):
    import json, requests
    param = json.loads(param)
    res = requests.post(url=url, json=param).json()['data']
    res = json.dumps(res).encode()
    return res

在api设定中

##apiConn=get_api_data
##param={ "p1": xxx,"

Read more

smartpip任务中实现获取执行参数


airflow支持基于时间的任务回跑, 这样我们就需要使用到airflow自带的参数, 如 "execution_date"

airflow能向任务传递的参数如下:

{'conf': , 'dag': , 'dag_run':, 
'ds': '2022-07-13',
'ds_nodash': '20220713', 
'execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'inlets': [], 
'macros': ,
'next_ds': '2022-07-13',

Read more

smartpip实现按ID增量抽取


在数据抽取过程中, 如何通过目标数据库最大ID或时间等条件进行抽取, 这样可以将数据同步的粒度做得更细

标准方法

在datax的数据同步设定中加入

##incColumn = 增量字段
##incDB = 目标查询DB[默认为starrocks]

数据集方式

  • 首先我们需在smartchart的数据集开发中新建一个查询sql 例如maxid, event_day对应的参数名
select 
 max(id) as maxid, max(event_day) as event_day 
from targettablename
  • 在DAG开发中将数据集ID当做参数传递即可,假设数据集ID为

Read more

smartpip实现自定义邮件


你可能需要自定义邮件内容, 甚至可能需要动态获取数据进行发送

固定内容发送

msg = ['报表刷新成功', '<h1>刷新成功!!</h1>']
maillist = 'xxx@smartchart.cn'

#send_mail reportmail msg maillist

动态获取数据发送

maillist = 'xxx@smartchart.cn,abc@xxx.cn'
def fun_msg():
    result = get_dataset(123)['data']
  

Read more

smartpip使用方法sample


一个完整的sample

//获取变量,以下为python语法(option)
P_DAYS = 12
MSG = '------------------------------------------------------------'
report_time = datetime.datetime.now()- datetime.timedelta(days=int(P_DAYS))
P_START_ZYM= report_time.strftime('%Y%m')

//以下为JOB专署语法
#link  lastdag 30 3600        --

Read more

smartpip不常用驱动使用方法


sqoop
填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn =  zspl        -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename   -- 源系统表名,(必填)
columns =             -- 抽取原表的字段  a,b,c (可省略)
where =                -- 抽取时的条件  a>1 (可省略)
seq =                    -- 分隔符, 默认 \t (可省略)
query =                -- 查询语名,

Read more

使用SmartPip监控Starrocks的Routine Load


什么是Routine Load

Starrocks支持例行导入(Routine Load)功能,提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 StarRocks 中。

什么是SmartPip

smartpip是我们基于airflow研发的任务调度平台, 针对定时导入kafka数据到starrocks, 已实现了一个配置即可实现 但是如果对实时监听kafka导入, 并不太合适, 所以本文将介绍如何使用starrocks自带的routine load的功能,同时来使用smartpip实现监控功能

如何实现

  • 在smartchart中新建一个数据集, 查询内容:

    SHOW 

Read more

SmartPip DAG设定方法


关于任务调度的配置方法:

自定义参数方法

-- 当前时间
report_time = datetime.datetime.now()
-- 当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
-- 获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
-- 格式化字符串时间:(%Y-

Read more