该Writer提供向Pegasus系统的指定表中写入数据的功能。
- 写数据使用Pegasus Java Client,当前使用1.8.0-thrift-0.11.0-inlined-release版本,你需要先maven install该客户端库;
- Pegasus是Key-Value系统,不支持Schema,所有类型的数据在存储到Pegasus中时都会转化为byte[]进行存储;
- 通过mapping配置列映射,需要在mapping中指定一个列作为Pegasus存储的HashKey;
假设数据源来自Hive,表结构为:
CREATE TABLE test_table (
aprefid STRING,
bssid STRING,
ssid STRING,
la_t STRING,
lo_t STRING,
label_t STRING,
last_update_dt_t STRING,
la_b STRING,
lo_b STRING,
label_b STRING,
last_update_dt_b STRING
)
COMMENT 'This is a test table'
stored AS ORC;
配置样例hdfs2pegasus.json
,从Hive所存储的HDFS向Pegasus系统导数据,并将aprefid列作为HashKey:
{
"job":{
"content":[
{
"reader":{
"name":"hdfsreader",
"parameter":{
"defaultFS":"hdfs://xxx:port",
"path":"/user/hive/warehouse/pegasus.db/test_table",
"encoding":"UTF-8",
"fileType":"orc",
"column":[
"*"
]
}
},
"writer":{
"name":"pegasuswriter",
"parameter":{
"cluster":"x.x.x.x:34601,x.x.x.x:34601",
"table":"datax_test",
"write_type":"insert",
"encoding":"UTF-8",
"timeout_ms":"10000",
"ttl_seconds":"0",
"retry_count":"2",
"retry_delay_ms":"10000",
"mapping":{
"hash_key":"${0}",
"values":[
{
"sort_key":"",
"value":"${1}"
},
{
"sort_key":"la_t",
"value":"${3}"
},
{
"sort_key":"lo_t",
"value":"${4}"
},
{
"sort_key":"${6}",
"value":"${6},${10}"
}
]
}
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
-
cluster
-
描述:Pegasus集群的meta-server地址列表。格式:ip1:port1,ip2:port2 。
-
必选:是
-
默认值:无
-
-
table
-
描述:要写入的表名。
-
必选:是
-
默认值:无
-
-
write_type
-
描述:数据写入类型:insert或者delete。
-
必选:否
-
默认值:insert
-
-
encoding
-
描述:写数据时将String转化为byte[]的编码配置。
-
必选:否
-
默认值:UTF-8,慎重修改
-
-
timeout_ms
-
描述:写数据操作的超时时间,单位毫秒。
-
必选:否
-
默认值:10000,建议在1000以上
-
-
ttl_seconds
-
描述:写数据的TTL(Time-To-Live)时间限制,单位秒。
-
必选:否
-
默认值:0,表示数据不设置TTL限制
-
-
retry_count
-
描述:写数据失败后的重试次数。
-
必选:否
-
默认值:2
-
-
retry_delay_ms
-
描述:写数据失败后等待下一次重试的时间,单位毫秒。
-
必选:否
-
默认值:10000,建议在1000以上
-
-
mapping
用户需要指定Column字段到Pegasus存储的映射关系,配置如下:
"mapping":{ "hash_key":"", "values":[ { "sort_key":"", "value":"" }, { "sort_key":"", "value":"" } ] }
-
说明:
- mapping中必须指定hash_key和values;
- hash_key的值不能为空;
- values的列表不能为空,且列表中每个元素必须指定sort_key和value,不允许出现重复的sort_key;
- sort_key和value的值可以为空;
- hash_key、sort_key和value的值中可以通过${i}的方式嵌入Column[i]的值:
- ${i}可以出现多次,所有地方都会被替换;
- 如果i不是整数或者Column[i]不存在,则保留原状,不进行替换;
- 如果想表达原始的"$"符号,可以用"$$"进转义,譬如"$${1}"就不会被Column[1]替换,而是转换为"${1}";
-
必选:是
-
默认值:无
-
Pegasus不支持Schema,所有数据都以byte[]方式存储。
目前所有类型的Column都先通过Column.asString()方法转化为String,然后通过String.getBytes()方法转化为byte[]。
用户可以配置String.getBytes()方法的encoding类型。
- 下载和安装Pegasus Java Client
git clone https://github.com/xiaomi/pegasus-java-client
cd pegasus-java-client
git checkout 1.8.0-thrift-0.11.0-inlined-release
mvn clean install -DskipTests
- 下载和编译DataX:
git clone https://github.com/xiaomi/pegasus-datax.git
cd pegasus-datax
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
-
参照上面的配置样例,准备配置文件
hdfs2pegasus.json
。 -
运行:
python target/datax/datax/bin/datax.py ./hdfs2pegasus.json
略
略