smartpip不常用驱动使用方法


sqoop
填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn =  zspl        -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename   -- 源系统表名,(必填)
columns =             -- 抽取原表的字段  a,b,c (可省略)
where =                -- 抽取时的条件  a>1 (可省略)
seq =                    -- 分隔符, 默认 \t (可省略)
query =                -- 查询语名, select * from xxx (可省略)
m =                     -- 启动map数,并行执行,需和split一起使用, 默认1(可省略)
split =                 -- 指定按字段拆分,字段为数值或时间格式, 不能有空(可省略)
otherParam =        -- 其它sqoop自定义参数 (可省略)
 */

load data inpath '/user/etl/tablename/*' overwrite into table xx.tablename;
refresh xxx.tablename

在测试过程可能会用到自定义的连接, 你可以将连接写在DAG设定中, 
SQOOP_PARA['zspl'] = '--connect jdbc:mysql://10.10.x.x:3306/xxx --username xxxxxxx --password xxxxxx'
#sqoop mysqoop
#sqoop mysqoop  P1,P2

sap调用sap的rfc功能抽取数据到表, 流程如下:

1. 在上传设定中新建设定, 如名称为: mysapdata, 如果你是上传到hive, 建意起始行设为-1, 其它设为0
2. 编写mysapdata.json文件, 内容如下, 并上传到项目目录
    {
        "function": "abc", 
        "input": {
            "P1": "20201007", 
            "P2": "20201109"
        }, 
      "dataset": {},
      "table": "IT_RETAB",
      "columns": ["LGORT","TOTAL_WDJ","TOTAL_ZY"],
      "csvparam": {}
    }
    参数说明:
    function: 函数名
    input: 输入的固定参数
    dataset: 
    可实现动态参数,例如{"S_MATNR": 504}, 504是指数据集的ID,
    可以实现你从数据库中查询结果进行参数输入,需要与smartchart结合使用, 
    参数值样列
    "S_MATNR"=[{"表头1":xxx, "表头2":xxx ...}, {"表头1":xxx, "表头2":xxx ..}...]
    table: RFC返回的表名
    columns: 指定生成的列, 可留空自动识别
    csvparam: 设定生成的csv格式, 默认分隔符为|, 修改可以设定,{"sep": ","}

3. DAG设定中填写 #sap   mysapdata, 如你有input中的动态参数, 可以如下填写方式, 可替换json文件中的参数
    P1= '20201008'
    P2= '20201020'
    #sap   mysapdata P1,P2
starrocks 使用broker load方法到starrocks
填写以下内容到你的SQL文件, 比如命名为: mystarrocks.sql
/*
targetDB = 目标库名
targetTable= 目标表名
sourceDB = 源库名
sourceTable = 源表名
 */
truncate table $targetDB.$targetTable;
LOAD LABEL $targetDB.$LABEL
(
    DATA INFILE("$HDFS/$sourceDB.db/$sourceTable/*")
    INTO TABLE  $targetTable
    FORMAT AS parquet
)
WITH BROKER 'hdfs_broker'
PROPERTIES
(
    "timeout" = "3600"
)

#hdfsstarrocks mystarrocks
#hdfsstarrocks mystarrocks  P1,P2

其它不重要说明

全局参数定义的方法,airflow自带,但不常用了
OPG_LOAD_DAYS = Variable.get("OPG_LOAD_DAYS") //从airflow中获取
MSG = Variable.get("MSG")
report_time = datetime.datetime.now()- datetime.timedelta(days=int(OPG_LOAD_DAYS))
P_START_ZYM= report_time .strftime('%Y%m') //由python代码生成的参数