sqoop
填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn = zspl -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename -- 源系统表名,(必填)
columns = -- 抽取原表的字段 a,b,c (可省略)
where = -- 抽取时的条件 a>1 (可省略)
seq = -- 分隔符, 默认 \t (可省略)
query = -- 查询语名, select * from xxx (可省略)
m = -- 启动map数,并行执行,需和split一起使用, 默认1(可省略)
split = -- 指定按字段拆分,字段为数值或时间格式, 不能有空(可省略)
otherParam = -- 其它sqoop自定义参数 (可省略)
*/
load data inpath '/user/etl/tablename/*' overwrite into table xx.tablename;
refresh xxx.tablename
在测试过程可能会用到自定义的连接, 你可以将连接写在DAG设定中, 如
SQOOP_PARA['zspl'] = '--connect jdbc:mysql://10.10.x.x:3306/xxx --username xxxxxxx --password xxxxxx'
#sqoop mysqoop
#sqoop mysqoop P1,P2
sap调用sap的rfc功能抽取数据到表, 流程如下:
1. 在上传设定中新建设定, 如名称为: mysapdata, 如果你是上传到hive, 建意起始行设为-1, 其它设为0
2. 编写mysapdata.json文件, 内容如下, 并上传到项目目录
{
"function": "abc",
"input": {
"P1": "20201007",
"P2": "20201109"
},
"dataset": {},
"table": "IT_RETAB",
"columns": ["LGORT","TOTAL_WDJ","TOTAL_ZY"],
"csvparam": {}
}
参数说明:
function: 函数名
input: 输入的固定参数
dataset:
可实现动态参数,例如{"S_MATNR": 504}, 504是指数据集的ID,
可以实现你从数据库中查询结果进行参数输入,需要与smartchart结合使用,
参数值样列
"S_MATNR"=[{"表头1":xxx, "表头2":xxx ...}, {"表头1":xxx, "表头2":xxx ..}...]
table: RFC返回的表名
columns: 指定生成的列, 可留空自动识别
csvparam: 设定生成的csv格式, 默认分隔符为|, 修改可以设定,如{"sep": ","}
3. DAG设定中填写 #sap mysapdata, 如你有input中的动态参数, 可以如下填写方式, 可替换json文件中的参数
P1= '20201008'
P2= '20201020'
#sap mysapdata P1,P2
starrocks 使用broker load方法到starrocks
填写以下内容到你的SQL文件, 比如命名为: mystarrocks.sql
/*
targetDB = 目标库名
targetTable= 目标表名
sourceDB = 源库名
sourceTable = 源表名
*/
truncate table $targetDB.$targetTable;
LOAD LABEL $targetDB.$LABEL
(
DATA INFILE("$HDFS/$sourceDB.db/$sourceTable/*")
INTO TABLE $targetTable
FORMAT AS parquet
)
WITH BROKER 'hdfs_broker'
PROPERTIES
(
"timeout" = "3600"
)
#hdfsstarrocks mystarrocks
#hdfsstarrocks mystarrocks P1,P2
其它不重要说明
全局参数定义的方法,airflow自带,但不常用了
OPG_LOAD_DAYS = Variable.get("OPG_LOAD_DAYS") //从airflow中获取
MSG = Variable.get("MSG")
report_time = datetime.datetime.now()- datetime.timedelta(days=int(OPG_LOAD_DAYS))
P_START_ZYM= report_time .strftime('%Y%m') //由python代码生成的参数