Skip to content

Commit

Permalink
[Imporve][Connector-V2] Imporve iotdb connector
Browse files Browse the repository at this point in the history
Source:
* Add e2e testcase
* Add example into documents
* Support `align by` sql syntax
* Support sql split ignore case
* Support restore split offset to `at-least-once`
* Support read timestamp from `RowRecord`
* Fix assign split: splitId % taskSize -> splitId % readerSize
* Fix assign split owner(negative number)

Sink:
* Support extract `timestamp`、`device`、`measurement` from SeaTunnelRow
* Support TINYINT、SMALLINT
* Add example into documents
* Support flush cache to database before `prepareCommit`
  • Loading branch information
hailin0 committed Sep 27, 2022
1 parent 57342c6 commit f73d18c
Show file tree
Hide file tree
Showing 22 changed files with 1,169 additions and 581 deletions.
143 changes: 111 additions & 32 deletions docs/en/connector-v2/sink/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

## Options

| name | type | required | default value |
|-------------------------------|-------------------|----------|---------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| default_thrift_buffer_size | int | no | - |
| max_thrift_frame_size | int | no | - |
| zone_id | string | no | - |
| enable_rpc_compression | boolean | no | - |
| connection_timeout_in_ms | int | no | - |
| timeseries_options | list | no | - |
| timeseries_options.path | string | no | - |
| timeseries_options.data_type | string | no | - |
| common-options | string | no | - |
| name | type | required | default value |
|-------------------------------|-------------------|----------|-----------------------------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| key_device | string | yes | - |
| key_timestamp | string | no | processing time |
| key_measurement_fields | array | no | exclude `device` & `timestamp` |
| storage_group | string | no | - |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| default_thrift_buffer_size | int | no | - |
| max_thrift_frame_size | int | no | - |
| zone_id | string | no | - |
| enable_rpc_compression | boolean | no | - |
| connection_timeout_in_ms | int | no | - |
| common-options | string | no | - |

### node_urls [list]

Expand All @@ -55,6 +56,24 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

`IoTDB` user password

### key_device [string]

Specify field name of the `IoTDB` deviceId in SeaTunnelRow

### key_timestamp [string]

Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

### key_measurement_fields [array]

Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp`

### storage_group [string]

Specify device storage group(path prefix)

example: deviceId = ${storage_group} + "." + ${key_device}

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
Expand Down Expand Up @@ -95,24 +114,16 @@ Enable rpc compression in `IoTDB` client

The maximum time (in ms) to wait when connect `IoTDB`

### timeseries_options [list]

Timeseries options

### timeseries_options.path [string]

Timeseries path

### timeseries_options.data_type [string]

Timeseries data type

### common options [string]

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

## Examples

### Case1

Common options:

```hocon
sink {
IoTDB {
Expand All @@ -123,4 +134,72 @@ sink {
batch_interval_ms = 1000
}
}
```
```

When you assign `key_device` is `device_name`, for example:

```hocon
sink {
IoTDB {
...
key_device = "device_name"
}
}
```

Upstream SeaTunnelRow data format is the following:

| device_name | field_1 | field_2 |
|----------------------------|-------------|-------------|
| root.test_group.device_a | 1001 | 1002 |
| root.test_group.device_b | 2001 | 2002 |
| root.test_group.device_c | 3001 | 3002 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
```

### Case2

When you assign `key_device``key_timestamp``key_measurement_fields`, for example:

```hocon
sink {
IoTDB {
...
key_device = "device_name"
key_timestamp = "ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```

Upstream SeaTunnelRow data format is the following:

|ts | device_name | field_1 | field_2 | temperature | moisture |
|--------------------|----------------------------|-------------|-------------|-------------|-------------|
|1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
|1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
|1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```
69 changes: 63 additions & 6 deletions docs/en/connector-v2/source/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ supports query SQL and can achieve projection effect.

| name | type | required | default value |
|----------------------------|---------|----------|---------------|
| host | string | yes | - |
| port | Int | yes | - |
| node_urls | string | yes | - |
| sql | string | yes | |
| host | string | no | - |
| port | int | no | - |
| node_urls | string | no | - |
| username | string | yes | - |
| password | string | yes | - |
| sql | string | yes | - |
| fields | config | yes | - |
| fetch_size | int | no | - |
| username | string | no | - |
| password | string | no | - |
| lower_bound | long | no | - |
| upper_bound | long | no | - |
| num_partitions | int | no | - |
Expand Down Expand Up @@ -147,3 +147,60 @@ lower bound of the time column
```

## Examples

### Case1

Common options:

```hocon
source {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
}
}
```

When you assign `sql``fields``partition`, for example:

```hocon
sink {
IoTDB {
...
sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
lower_bound = 1
upper_bound = 4102329600000
num_partitions = 10
fields {
ts = bigint
device_name = string
temperature = float
moisture = bigint
}
}
}
```

Upstream `IoTDB` data format is the following:

```shell
IoTDB> SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```

Loaded to SeaTunnelRow data format is the following:

|ts | device_name | temperature | moisture |
|--------------------|----------------------------|-------------|-------------|
|1664035200001 | root.test_group.device_a | 36.1 | 100 |
|1664035200001 | root.test_group.device_b | 36.2 | 101 |
|1664035200001 | root.test_group.device_c | 36.3 | 102 |
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public abstract class AbstractSinkWriter<T, StateT> implements SinkWriter<T, Void, StateT> {

@Override
public final Optional<Void> prepareCommit() {
public Optional<Void> prepareCommit() {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

@Setter
@Getter
@ToString
public class SinkConfig extends CommonConfig {

public static final String KEY_TIMESTAMP = "key_timestamp";
public static final String KEY_DEVICE = "key_device";
public static final String KEY_MEASUREMENT_FIELDS = "key_measurement_fields";
public static final String STORAGE_GROUP = "storage_group";
public static final String BATCH_SIZE = "batch_size";
public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
public static final String MAX_RETRIES = "max_retries";
Expand All @@ -49,12 +49,13 @@ public class SinkConfig extends CommonConfig {
public static final String ZONE_ID = "zone_id";
public static final String ENABLE_RPC_COMPRESSION = "enable_rpc_compression";
public static final String CONNECTION_TIMEOUT_IN_MS = "connection_timeout_in_ms";
public static final String TIMESERIES_OPTIONS = "timeseries_options";
public static final String TIMESERIES_OPTION_PATH = "path";
public static final String TIMESERIES_OPTION_DATA_TYPE = "data_type";

private static final int DEFAULT_BATCH_SIZE = 1024;

private String keyTimestamp;
private String keyDevice;
private List<String> keyMeasurementFields;
private String storageGroup;
private int batchSize = DEFAULT_BATCH_SIZE;
private Integer batchIntervalMs;
private int maxRetries;
Expand All @@ -65,7 +66,6 @@ public class SinkConfig extends CommonConfig {
private ZoneId zoneId;
private Boolean enableRPCCompression;
private Integer connectionTimeoutInMs;
private List<TimeseriesOption> timeseriesOptions;

public SinkConfig(@NonNull List<String> nodeUrls,
@NonNull String username,
Expand All @@ -78,6 +78,17 @@ public static SinkConfig loadConfig(Config pluginConfig) {
pluginConfig.getStringList(NODE_URLS),
pluginConfig.getString(USERNAME),
pluginConfig.getString(PASSWORD));

sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE));
if (pluginConfig.hasPath(KEY_TIMESTAMP)) {
sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP));
}
if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS)) {
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS));
}
if (pluginConfig.hasPath(STORAGE_GROUP)) {
sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP));
}
if (pluginConfig.hasPath(BATCH_SIZE)) {
int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
sinkConfig.setBatchSize(batchSize);
Expand Down Expand Up @@ -117,31 +128,11 @@ public static SinkConfig loadConfig(Config pluginConfig) {
checkNotNull(sinkConfig.getEnableRPCCompression());
sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
}
if (pluginConfig.hasPath(TIMESERIES_OPTIONS)) {
List<? extends Config> timeseriesConfigs = pluginConfig.getConfigList(TIMESERIES_OPTIONS);
List<TimeseriesOption> timeseriesOptions = new ArrayList<>(timeseriesConfigs.size());
for (Config timeseriesConfig : timeseriesConfigs) {
String timeseriesPath = timeseriesConfig.getString(TIMESERIES_OPTION_PATH);
String timeseriesDataType = timeseriesConfig.getString(TIMESERIES_OPTION_DATA_TYPE);
TimeseriesOption timeseriesOption = new TimeseriesOption(
timeseriesPath, TSDataType.valueOf(timeseriesDataType));
timeseriesOptions.add(timeseriesOption);
}
sinkConfig.setTimeseriesOptions(timeseriesOptions);
}
return sinkConfig;
}

private static int checkIntArgument(int args) {
checkArgument(args > 0);
return args;
}

@Getter
@ToString
@AllArgsConstructor
public static class TimeseriesOption implements Serializable {
private String path;
private TSDataType dataType = TSDataType.TEXT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class SourceConstants {

public static final String SQL_WHERE = "where";

public static final String SQL_ALIGN = "align by";

public static final String DEFAULT_PARTITIONS = "0";

}
Loading

0 comments on commit f73d18c

Please sign in to comment.