Skip to content

Commit

Permalink
Merge branch 'dev' into new-connector-cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Nov 8, 2022
2 parents 2c9af78 + 630e884 commit 7b5181f
Show file tree
Hide file tree
Showing 54 changed files with 2,169 additions and 405 deletions.
104 changes: 104 additions & 0 deletions docs/en/connector-v2/sink/InfluxDB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# InfluxDB

> InfluxDB sink connector
## Description

Write data to InfluxDB.

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|----------|----------|-------------------------------|
| url | string | yes | - |
| database | string | yes | |
| measurement | string | yes | |
| username | string | no | - |
| password | string | no | - |
| key_time | string | yes | processing time |
| key_tags | array | no | exclude `field` & `key_time` |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| connect_timeout_ms | long | no | 15000 |

### url
the url to connect to influxDB e.g.
```
http://influxdb-host:8086
```

### database [string]

The name of `influxDB` database

### measurement [string]

The name of `influxDB` measurement

### username [string]

`influxDB` user username

### password [string]

`influxDB` user password

### key_time [string]

Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

### key_tags [array]

Specify field-name of the `influxDB` measurement tags in SeaTunnelRow.
If not specified, include all fields with `influxDB` measurement field

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `influxDB`

### connect_timeout_ms [long]
the timeout for connecting to InfluxDB, in milliseconds

## Examples
```hocon
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}
```

## Changelog

### next version

- Add InfluxDB Sink Connector
2 changes: 1 addition & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,4 @@ seatunnel.sink.Amazondynamodb = connector-amazondynamodb
seatunnel.source.Cassandra = connector-cassandra
seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks

seatunnel.sink.InfluxDB = connector-influxdb
33 changes: 30 additions & 3 deletions seatunnel-connectors-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ this [issue](https://github.com/apache/incubator-seatunnel/issues/1608) for deta
In order to separate from the old code, we have defined new modules for execution flow. This facilitates parallel
development at the current stage, and reduces the difficulty of merging.

### engineering structure

- ../`seatunnel-connectors-v2` connector-v2 code implementation
- ../`seatunnel-translation` translation layer for the connector-v2
- ../seatunnel-e2e/`seatunnel-flink-connector-v2-e2e` end to end testcase running on flink
- ../seatunnel-e2e/`seatunnel-spark-connector-v2-e2e` end to end testcase running on spark
- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2 example use flink local running instance
- ../seatunnel-examples/`seatunnel-spark-connector-v2-example` seatunnel connector-v2 example use spark local running instance

### **Example**

We have prepared two new version of the locally executable example program in `seatunnel-examples`,one
Expand All @@ -22,13 +31,31 @@ configuration files used in example are saved in the "resources/examples" folder
own connectors, you need to follow the steps below.

1. Add the groupId, artifactId and version of the connector to be tested to
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml when you want to runs it in Spark engine) as a
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it to
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you want to runs it in Spark engine) as a
dependency.
2. Find the dependency in your connector pom file which scope is test or provided and then add them to
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and modify the scope to compile.
3. Refer to the SeaTunnelApiExample class to develop your sample code.
3. Add the task configuration file under resources/examples.
4. Configure the file in the `SeaTunnelApiExample` main method.
5. Just run the main method.

### **Create new seatunnel v2 connector**

1.Create a new module under the `seatunnel-connectors-v2` directory and name it connector - {connector name}.

2.The pom file can refer to the pom file of the existing connector, and add the current sub model to the pom file of the parent model

3.Create two packages corresponding to source and sink

​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source

​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink

4.add connector info to plugin-mapping.properties file in seatunnel root path.

5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can be find in binary package.

### **Startup Class**

Expand Down
29 changes: 28 additions & 1 deletion seatunnel-connectors-v2/README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过

为了和老的代码分开,方便现阶段的并行开发,以及降低merge的难度。我们为新的执行流程定义了新的模块

### **工程结构**

- ../`seatunnel-connectors-v2` connector-v2代码实现
- ../`seatunnel-translation` connector-v2的翻译层
- ../seatunnel-e2e/`seatunnel-flink-connector-v2-e2e` flink上运行的端到端testcase
- ../seatunnel-e2e/`seatunnel-spark-connector-v2-e2e` spark上运行的端到端testcase
- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2的flink local运行的实例
- ../seatunnel-examples/`seatunnel-spark-connector-v2-example` seatunnel connector-v2的spark local运行的实例

### Example

我们已经在`seatunnel-examples`
Expand All @@ -18,7 +27,25 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖)
2. 如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(
或者在seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml)中,并且修改scope为compile.
3. 参考`SeaTunnelApiExample`开发自己的案例程序。
3. 在resources/examples下添加任务配置文件.
4.`SeaTunnelApiExample` main方法中配置文件.
5. 运行main方法即可.

### 创建新的seatunnel v2 connector

1.在`seatunnel-connectors-v2`目录下新建一个module,命名为connector-{连接器名}.

2.pom文件可以参考已有连接器的pom文件,并在父model的pom文件中添加当前子model.

3.新建两个package分别对应source和sink

​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.source

​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.sink

4.将连接器信息添加到在项目根目录的plugin-mapping.properties文件中.

5.将连接器添加到seatunnel-dist/pom.xml,这样连接器jar就可以在二进制包中找到.

### 启动类

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class AssertSinkFactory implements TableSinkFactory {

@Override
public String factoryIdentifier() {
return "AssertSink";
return "Assert";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class ClickhouseSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "ClickhouseSink";
return "Clickhouse";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class ClickhouseFileSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "ClickhouseFileSink";
return "ClickhouseFile";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class ClickhouseSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return "ClickhouseSource";
return "Clickhouse";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class ConsoleSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "ConsoleSink";
return "Console";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class DataHubSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "DataHubSink";
return "DataHub";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class DingTalkSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "DingTalkSink";
return "DingTalk";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.client;

import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -75,4 +76,18 @@ public Response intercept(Chain chain) throws IOException {
log.info("connect influxdb successful. sever version :{}.", version);
return influxDB;
}

public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) {
String rp = sinkConfig.getRp();
if (!StringUtils.isEmpty(rp)) {
influxDB.setRetentionPolicy(rp);
}
}

public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException {
InfluxDB influxDB = getInfluxDB(sinkConfig);
influxDB.setDatabase(sinkConfig.getDatabase());
setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
return influxDB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class InfluxDBConfig implements Serializable {
Expand All @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable {
public static final String URL = "url";
private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";

public static final String SQL = "sql";
public static final String SQL_WHERE = "where";

public static final String DATABASES = "database";
public static final String SPLIT_COLUMN = "split_column";
private static final String PARTITION_NUM = "partition_num";
private static final String UPPER_BOUND = "upper_bound";
private static final String LOWER_BOUND = "lower_bound";


private static final String DEFAULT_FORMAT = "MSGPACK";
private static final String EPOCH = "epoch";

public static final String DEFAULT_PARTITIONS = "0";
protected static final String EPOCH = "epoch";
private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;

private static final String DEFAULT_EPOCH = "n";

private String url;
private String username;
private String password;
private String sql;
private int partitionNum = 0;
private String splitKey;
private long lowerBound;
private long upperBound;
private String database;

private String format = DEFAULT_FORMAT;
Expand All @@ -69,30 +50,15 @@ public class InfluxDBConfig implements Serializable {

private String epoch = DEFAULT_EPOCH;

List<Integer> columnsIndex;

public InfluxDBConfig(Config config) {
this.url = config.getString(URL);
this.sql = config.getString(SQL);

if (config.hasPath(USERNAME)) {
this.username = config.getString(USERNAME);
}
if (config.hasPath(PASSWORD)) {
this.password = config.getString(PASSWORD);
}
if (config.hasPath(PARTITION_NUM)) {
this.partitionNum = config.getInt(PARTITION_NUM);
}
if (config.hasPath(UPPER_BOUND)) {
this.upperBound = config.getInt(UPPER_BOUND);
}
if (config.hasPath(LOWER_BOUND)) {
this.lowerBound = config.getInt(LOWER_BOUND);
}
if (config.hasPath(SPLIT_COLUMN)) {
this.splitKey = config.getString(SPLIT_COLUMN);
}
if (config.hasPath(DATABASES)) {
this.database = config.getString(DATABASES);
}
Expand Down
Loading

0 comments on commit 7b5181f

Please sign in to comment.