分类目录归档:数据管道

smartpip调用API


smartpip支持所有Airflow的API接口,为了更方便使用可以采用smartchart的API服务功能 以下以触发调度为例

  • 在smartchart数据集中新建一个python数据源的数据集
  • 填写如下代码

    from etl.smartpip import get_auth_hearder
    import requests
    header=get_auth_hearder()
    url='{url}/api/v1/dags/{dag_id}/dagRuns'
    ds=requests.post(url=url,headers=header,json={"conf&quo

Read more

smartpip微信消息


  • 在smartchart中新建一个数据源,如下图 图片alt

  • 新建一个数据集并使用qiweiMsg这个数据源, 按照企微消息发送文档填写, 记下数据集ID如 12

{
   "touser" : "1359xxxxx",
   -- "totag" : "4",
   "msgtype" : "text",
   "agentid" : xxxxxxx,
   "text" : {
       "content" : 

Read more

smartpip常用自定义函数


功能说明 函数说明 备注
文件上传 smart_upload(csvfilepath)
获取数据 get_dataset(id, param={})
发送邮件 dash_mail(标题, 内容, 邮件列表)
执行shell run_bash(cmdStr)
执行datax run_datax('DAG名/任务名', para_dict)
执行sql run_sql_file('DAG名/任务名', db_connect='starrocks', para_dict=None)
执行kettle run_kettle('DAG名/任务名.ktr',

Read more

smartpip API数据接入


定义接入标准function

假设API的返回格式为:
{'data':[{'a':1,'b':2},..]}
def _get_api_data(param):
    import json, requests
    param = json.loads(param)
    res = requests.post(url=url, json=param).json()['data']
    res = json.dumps(res).encode()
    return res

在api设定中

##apiConn=get_api_data
##param={ "p1": xxx,"

Read more

向量数据库


也许你最近可能听过这样的新闻,某向量数据库的初创公司刚写好 PPT,就获得了几千万的投资,某公司的开源的向量数据库因其代码的简陋而登上了 Hackernews 等等。在过去几个月时间中, AI 应用的发展如火如荼,带动了 AI 应用技术栈上下游的火爆,而向量数据库就是其中最热门的之一。

GPT 的缺陷

过去几个月的时间,我们正处于人工智能的革命中,其中最耀眼的莫过于 GPT-3.5/4 的横空出世,而 GPT-3.5/4 带给我们无限震撼的同时,其天然的缺陷和诸多的限制也让开发者头痛不已,例如其输入端上下文(tokens)大小的限制困扰着很多的开发者和消费者,像 gpt-3.5-turbo 模

Read more

smartpip任务中实现获取执行参数


airflow支持基于时间的任务回跑, 这样我们就需要使用到airflow自带的参数, 如 "execution_date"

airflow能向任务传递的参数如下:

{'conf': , 'dag': , 'dag_run':, 
'ds': '2022-07-13',
'ds_nodash': '20220713', 
'execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'inlets': [], 
'macros': ,
'next_ds': '2022-07-13',

Read more

SQL数据分析常用函数


SQL 有很多可用于计数和计算的内建函数。常用的函数有聚合函数、日期和时间函数、转换函数、窗口函数、字符串函数等。

聚合函数

名称 功能 备注
AVG 平均值
COUNT 非空值的个数
FIRST 第一个记录的值
LAST 最后一个记录的值
MAX() 最大值
MIN 最小值
SUM 总和

日期函数

名称 功能 备注
CURDATE 当前日期
CURTIME 当前时间
NOW 当前日期和时间
UNIX_TIMESTAMP 返回 UNIX 时间戳
DATE_ADD 将两个日期相加
DATE_FORMA

Read more

smartpip实现按ID增量抽取


在数据抽取过程中, 如何通过目标数据库最大ID或时间等条件进行抽取, 这样可以将数据同步的粒度做得更细

标准方法

在datax的数据同步设定中加入

##incColumn = 增量字段
##incDB = 目标查询DB[默认为starrocks]

数据集方式

  • 首先我们需在smartchart的数据集开发中新建一个查询sql 例如maxid, event_day对应的参数名
select 
 max(id) as maxid, max(event_day) as event_day 
from targettablename
  • 在DAG开发中将数据集ID当做参数传递即可,假设数据集ID为

Read more

smartpip实现自定义邮件


你可能需要自定义邮件内容, 甚至可能需要动态获取数据进行发送

固定内容发送

msg = ['报表刷新成功', '<h1>刷新成功!!</h1>']
maillist = 'xxx@smartchart.cn'

#send_mail reportmail msg maillist

动态获取数据发送

maillist = 'xxx@smartchart.cn,abc@xxx.cn'
def fun_msg():
    result = get_dataset(123)['data']
  

Read more

smartpip实现循环抽取任务


比如数据库中存在一些表, 这些表名后缀是按天命名的, 现在需要自动抽取汇总到同一个表中, 这样就需要用到循环抽取 smartpip中已有datax组件, 但只能进行单一表格抽取, 不能增加逻辑 所以我们需要使用到smartpip的diy组件功能 首先我们需要新一个datax抽取任务,在这个任务中我们传递参数ZYM, 比如:

#datax job1  ZYM

之后再新一建一个diy任务, 实现循环抽取:

ZYM = '202001'
def fun_job2():
    job = os.path.join(ETL_FILE_PATH , '项目名/job1.sql')
    report

Read more