Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Imporve][Connector-V2] Imporve iotdb connector #2917

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 111 additions & 32 deletions docs/en/connector-v2/sink/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

## Options

| name | type | required | default value |
|-------------------------------|-------------------|----------|---------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| default_thrift_buffer_size | int | no | - |
| max_thrift_frame_size | int | no | - |
| zone_id | string | no | - |
| enable_rpc_compression | boolean | no | - |
| connection_timeout_in_ms | int | no | - |
| timeseries_options | list | no | - |
| timeseries_options.path | string | no | - |
| timeseries_options.data_type | string | no | - |
| common-options | string | no | - |
| name | type | required | default value |
|-------------------------------|-------------------|----------|-----------------------------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| key_device | string | yes | - |
| key_timestamp | string | no | processing time |
| key_measurement_fields | array | no | exclude `device` & `timestamp` |
| storage_group | string | no | - |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| default_thrift_buffer_size | int | no | - |
| max_thrift_frame_size | int | no | - |
| zone_id | string | no | - |
| enable_rpc_compression | boolean | no | - |
| connection_timeout_in_ms | int | no | - |
| common-options | string | no | - |

### node_urls [list]

Expand All @@ -55,6 +56,24 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

`IoTDB` user password

### key_device [string]

Specify field name of the `IoTDB` deviceId in SeaTunnelRow

### key_timestamp [string]

Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

### key_measurement_fields [array]

Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp`

### storage_group [string]

Specify device storage group(path prefix)

example: deviceId = ${storage_group} + "." + ${key_device}

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
Expand Down Expand Up @@ -95,24 +114,16 @@ Enable rpc compression in `IoTDB` client

The maximum time (in ms) to wait when connect `IoTDB`

### timeseries_options [list]

Timeseries options

### timeseries_options.path [string]

Timeseries path

### timeseries_options.data_type [string]

Timeseries data type

### common options [string]

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

## Examples

### Case1

Common options:

```hocon
sink {
IoTDB {
Expand All @@ -123,4 +134,72 @@ sink {
batch_interval_ms = 1000
}
}
```
```

When you assign `key_device` is `device_name`, for example:

```hocon
sink {
IoTDB {
...
key_device = "device_name"
}
}
```

Upstream SeaTunnelRow data format is the following:

| device_name | field_1 | field_2 |
|----------------------------|-------------|-------------|
| root.test_group.device_a | 1001 | 1002 |
| root.test_group.device_b | 2001 | 2002 |
| root.test_group.device_c | 3001 | 3002 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
```

### Case2

When you assign `key_device`、`key_timestamp`、`key_measurement_fields`, for example:

```hocon
sink {
IoTDB {
...
key_device = "device_name"
key_timestamp = "ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```

Upstream SeaTunnelRow data format is the following:

|ts | device_name | field_1 | field_2 | temperature | moisture |
|--------------------|----------------------------|-------------|-------------|-------------|-------------|
|1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
|1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
|1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```
69 changes: 63 additions & 6 deletions docs/en/connector-v2/source/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ supports query SQL and can achieve projection effect.

| name | type | required | default value |
|----------------------------|---------|----------|---------------|
| host | string | yes | - |
| port | Int | yes | - |
| node_urls | string | yes | - |
| sql | string | yes | |
| host | string | no | - |
| port | int | no | - |
| node_urls | string | no | - |
| username | string | yes | - |
| password | string | yes | - |
| sql | string | yes | - |
| fields | config | yes | - |
| fetch_size | int | no | - |
| username | string | no | - |
| password | string | no | - |
| lower_bound | long | no | - |
| upper_bound | long | no | - |
| num_partitions | int | no | - |
Expand Down Expand Up @@ -147,3 +147,60 @@ lower bound of the time column

```

## Examples

### Case1

Common options:

```hocon
source {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
}
}
```

When you assign `sql`、`fields`、`partition`, for example:

```hocon
sink {
IoTDB {
...
sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
lower_bound = 1
upper_bound = 4102329600000
num_partitions = 10
fields {
ts = bigint
device_name = string

temperature = float
moisture = bigint
}
}
}
```

Upstream `IoTDB` data format is the following:

```shell
IoTDB> SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
```

Loaded to SeaTunnelRow data format is the following:

|ts | device_name | temperature | moisture |
|--------------------|----------------------------|-------------|-------------|
|1664035200001 | root.test_group.device_a | 36.1 | 100 |
|1664035200001 | root.test_group.device_b | 36.2 | 101 |
|1664035200001 | root.test_group.device_c | 36.3 | 102 |
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public abstract class AbstractSinkWriter<T, StateT> implements SinkWriter<T, Void, StateT> {

@Override
public final Optional<Void> prepareCommit() {
public Optional<Void> prepareCommit() {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

@Setter
@Getter
@ToString
public class SinkConfig extends CommonConfig {

public static final String KEY_TIMESTAMP = "key_timestamp";
public static final String KEY_DEVICE = "key_device";
public static final String KEY_MEASUREMENT_FIELDS = "key_measurement_fields";
public static final String STORAGE_GROUP = "storage_group";
public static final String BATCH_SIZE = "batch_size";
public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
public static final String MAX_RETRIES = "max_retries";
Expand All @@ -49,12 +49,13 @@ public class SinkConfig extends CommonConfig {
public static final String ZONE_ID = "zone_id";
public static final String ENABLE_RPC_COMPRESSION = "enable_rpc_compression";
public static final String CONNECTION_TIMEOUT_IN_MS = "connection_timeout_in_ms";
public static final String TIMESERIES_OPTIONS = "timeseries_options";
public static final String TIMESERIES_OPTION_PATH = "path";
public static final String TIMESERIES_OPTION_DATA_TYPE = "data_type";

private static final int DEFAULT_BATCH_SIZE = 1024;

private String keyTimestamp;
private String keyDevice;
private List<String> keyMeasurementFields;
private String storageGroup;
private int batchSize = DEFAULT_BATCH_SIZE;
private Integer batchIntervalMs;
private int maxRetries;
Expand All @@ -65,7 +66,6 @@ public class SinkConfig extends CommonConfig {
private ZoneId zoneId;
private Boolean enableRPCCompression;
private Integer connectionTimeoutInMs;
private List<TimeseriesOption> timeseriesOptions;

public SinkConfig(@NonNull List<String> nodeUrls,
@NonNull String username,
Expand All @@ -78,6 +78,17 @@ public static SinkConfig loadConfig(Config pluginConfig) {
pluginConfig.getStringList(NODE_URLS),
pluginConfig.getString(USERNAME),
pluginConfig.getString(PASSWORD));

sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE));
if (pluginConfig.hasPath(KEY_TIMESTAMP)) {
sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP));
}
if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS)) {
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS));
}
if (pluginConfig.hasPath(STORAGE_GROUP)) {
sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP));
}
if (pluginConfig.hasPath(BATCH_SIZE)) {
int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
sinkConfig.setBatchSize(batchSize);
Expand Down Expand Up @@ -117,31 +128,11 @@ public static SinkConfig loadConfig(Config pluginConfig) {
checkNotNull(sinkConfig.getEnableRPCCompression());
sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
}
if (pluginConfig.hasPath(TIMESERIES_OPTIONS)) {
List<? extends Config> timeseriesConfigs = pluginConfig.getConfigList(TIMESERIES_OPTIONS);
List<TimeseriesOption> timeseriesOptions = new ArrayList<>(timeseriesConfigs.size());
for (Config timeseriesConfig : timeseriesConfigs) {
String timeseriesPath = timeseriesConfig.getString(TIMESERIES_OPTION_PATH);
String timeseriesDataType = timeseriesConfig.getString(TIMESERIES_OPTION_DATA_TYPE);
TimeseriesOption timeseriesOption = new TimeseriesOption(
timeseriesPath, TSDataType.valueOf(timeseriesDataType));
timeseriesOptions.add(timeseriesOption);
}
sinkConfig.setTimeseriesOptions(timeseriesOptions);
}
return sinkConfig;
}

private static int checkIntArgument(int args) {
checkArgument(args > 0);
return args;
}

@Getter
@ToString
@AllArgsConstructor
public static class TimeseriesOption implements Serializable {
private String path;
private TSDataType dataType = TSDataType.TEXT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class SourceConstants {

public static final String SQL_WHERE = "where";

public static final String SQL_ALIGN = "align by";

public static final String DEFAULT_PARTITIONS = "0";

}
Loading