Skip to content

Documentation for plugins_CH

jiangbo edited this page Apr 18, 2020 · 3 revisions

English | 中文

通用配置

配置文件

一个完整的Flinkx任务脚本配置包含 content, setting两个部分。content用于配置任务的输入源与输出源,其中包含reader,writer。而setting则配置任务整体的环境设定,其中包含restore,speed,errorLimit,dirty,log。具体如下所示:

{
  "job" : {
    "content" :[{
      "reader" : {
        ......
      },
      "writer" : {
        ......
      }
    }],
   "setting" : {
      "restore" : {
        ......
      },
      "speed" : {
        ......
      },
      "errorLimit" : {
        ......
      },
      "dirty" : {
        ......
      },
      "log" : {
        ......
      }
    }
  }
}
名称 说明 是否必填
content reader reader插件详细配置
writer writer插件详细配置
setting restore 任务类型及断点续传配置
speed 速率限制
errorLimit 出错控制
dirty 脏数据保存
log 日志记录配置

content配置

reader

reader用于配置数据的输入源,即数据从何而来。具体配置如下所示:

"reader" : {
  "name" : "xxreader",
  "parameter" : {
    ......
  }
}
名称 说明 是否必填
name reader插件名称,具体名称参考各数据源配置文档
parameter 数据源配置参数,具体配置参考各数据源配置文档

writer

writer用于配置数据的输出源,即数据写往何处。具体配置如下所示:

"writer" : {
  "name" : "xxwriter",
  "parameter" : {
    ......
  }
}
名称 说明 是否必填
name writer插件名称,具体名称参考各数据源配置文档
parameter 数据源配置参数,具体配置参考各数据源配置文档

setting配置

restore

restore用于配置同步任务类型(离线同步、实时采集)和断点续传功能。具体配置如下所示:

"restore" : {
  "isStream" : false,
  "isRestore" : false,
  "restoreColumnName" : "",
  "restoreColumnIndex" : 0,
  "maxRowNumForCheckpoint" : 10000
}
名称 说明 是否必填 默认值 参数类型
isStream 是否为实时采集任务 false Boolean
isRestore 是否开启断点续传 false Boolean
restoreColumnName 断点续传字段名称 开启断点续传后必填 string
restoreColumnIndex 断点续传字段索引ID 开启断点续传后必填 -1 int
maxRowNumForCheckpoint 触发checkpoint数据条数 10000 int

speed

speed用于配置任务并发数及速率限制。具体配置如下所示:

"speed": {
    "bytes": 1048576,
    "channel": 2,
    "rebalance": false,
    "readerChannel": 1,
    "writerChannel": 1
}
名称 说明 是否必填 默认值 参数类型
bytes 每秒字节数 Long.MAX_VALUE long
channel 通道数量 1 int
readerChannel reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数 int
writerChannel writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数 int
rebalance 此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。 boolean

errorLimit

errorLimit用于配置任务运行时数据读取写入的出错控制。具体配置如下所示:

"errorLimit" : {
  "record": 100,
  "percentage": 10.0
}
名称 说明 是否必填 默认值 参数类型
record 错误阈值,当错误记录数超过此阈值时任务失败 0 int
percentage 错误比例阈值,当错误记录比例超过此阈值时任务失败 0.0 double

dirty

dirty用于配置脏数据的保存,通常与上文出错控制联合使用。具体配置如下所示:

"dirty" : {
  "path" : "xxx",
  "hadoopConfig" : {
    ......
  }
 }
名称 说明 是否必填 默认值 参数类型
path 脏数据保存路径 string
hadoopConfig Hadoop相关配置 K-V键值对

参考模板如下:

"dirty" : {
        "path" : "/user/hive/warehouse/xx.db/xx",
        "hadoopConfig" : {
          "fs.default.name": "hdfs://0.0.0.0:9000",
          "dfs.ha.namenodes.ns1" : "nn1,nn2",
          "dfs.namenode.rpc-address.ns1.nn1" : "0.0.0.0:9000",
          "dfs.namenode.rpc-address.ns1.nn2" : "0.0.0.1:9000",
          "dfs.client.failover.proxy.provider.ns1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
          "dfs.nameservices" : "ns1"
        }
      }

log

log用于配置Flinkx中定义的插件日志的保存与记录。具体配置如下所示:

"log" : {
  "isLogger": false,
  "level" : "info",
  "path" : "/tmp/dtstack/flinkx/",
  "pattern":""
}
名称 说明 是否必填 默认值
isLogger 是否保存日志记录 false
level 日志级别 info
path 服务器上日志保存路径 /tmp/dtstack/flinkx/
pattern 日志输出格式 log4j:%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n logback : %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

插件使用文档

数据库 文档 文档
关系数据库(MySQL,Oracle,Sqlserver,Postgresql,Db2,Gbase,PolarDB,ClickHouse,SAP Hana,Teradata,Phoenix) reader writer
HDFS reader writer
FTP reader writer
Kudu reader writer
ElasticSearch reader writer
Hbase reader writer
Cassandra reader writer
ODPS reader writer
Carbondata reader writer
Redis reader
Hive writer
MongoDB reader writer
Kafka reader writer
EMQX reader writer
MySQL Binlog reader