Skip to content

Commit

Permalink
[Feature][Connector-V2] influxdb sink connector (#3174)
Browse files Browse the repository at this point in the history
* [Feature][Connector-V2] Add influxDB connector sink

* fix doc style

* remove old e2e for influxdb

* fix e2e and License header

* add Changelog

* delete useless log4j file

* mv scheduler to constructor of InfluxDBSinkWriter

* remove InfluxDBSinkWriter useless synchronized
  • Loading branch information
531651225 authored Nov 8, 2022
1 parent 5bfd508 commit 630e884
Show file tree
Hide file tree
Showing 18 changed files with 1,056 additions and 365 deletions.
104 changes: 104 additions & 0 deletions docs/en/connector-v2/sink/InfluxDB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# InfluxDB

> InfluxDB sink connector
## Description

Write data to InfluxDB.

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|----------|----------|-------------------------------|
| url | string | yes | - |
| database | string | yes | |
| measurement | string | yes | |
| username | string | no | - |
| password | string | no | - |
| key_time | string | yes | processing time |
| key_tags | array | no | exclude `field` & `key_time` |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| connect_timeout_ms | long | no | 15000 |

### url
the url to connect to influxDB e.g.
```
http://influxdb-host:8086
```

### database [string]

The name of `influxDB` database

### measurement [string]

The name of `influxDB` measurement

### username [string]

`influxDB` user username

### password [string]

`influxDB` user password

### key_time [string]

Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

### key_tags [array]

Specify field-name of the `influxDB` measurement tags in SeaTunnelRow.
If not specified, include all fields with `influxDB` measurement field

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `influxDB`

### connect_timeout_ms [long]
the timeout for connecting to InfluxDB, in milliseconds

## Examples
```hocon
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}
```

## Changelog

### next version

- Add InfluxDB Sink Connector
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.sink.InfluxDB = connector-influxdb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.client;

import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -75,4 +76,18 @@ public Response intercept(Chain chain) throws IOException {
log.info("connect influxdb successful. sever version :{}.", version);
return influxDB;
}

public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) {
String rp = sinkConfig.getRp();
if (!StringUtils.isEmpty(rp)) {
influxDB.setRetentionPolicy(rp);
}
}

public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException {
InfluxDB influxDB = getInfluxDB(sinkConfig);
influxDB.setDatabase(sinkConfig.getDatabase());
setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
return influxDB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class InfluxDBConfig implements Serializable {
Expand All @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable {
public static final String URL = "url";
private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";

public static final String SQL = "sql";
public static final String SQL_WHERE = "where";

public static final String DATABASES = "database";
public static final String SPLIT_COLUMN = "split_column";
private static final String PARTITION_NUM = "partition_num";
private static final String UPPER_BOUND = "upper_bound";
private static final String LOWER_BOUND = "lower_bound";


private static final String DEFAULT_FORMAT = "MSGPACK";
private static final String EPOCH = "epoch";

public static final String DEFAULT_PARTITIONS = "0";
protected static final String EPOCH = "epoch";
private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;

private static final String DEFAULT_EPOCH = "n";

private String url;
private String username;
private String password;
private String sql;
private int partitionNum = 0;
private String splitKey;
private long lowerBound;
private long upperBound;
private String database;

private String format = DEFAULT_FORMAT;
Expand All @@ -69,30 +50,15 @@ public class InfluxDBConfig implements Serializable {

private String epoch = DEFAULT_EPOCH;

List<Integer> columnsIndex;

public InfluxDBConfig(Config config) {
this.url = config.getString(URL);
this.sql = config.getString(SQL);

if (config.hasPath(USERNAME)) {
this.username = config.getString(USERNAME);
}
if (config.hasPath(PASSWORD)) {
this.password = config.getString(PASSWORD);
}
if (config.hasPath(PARTITION_NUM)) {
this.partitionNum = config.getInt(PARTITION_NUM);
}
if (config.hasPath(UPPER_BOUND)) {
this.upperBound = config.getInt(UPPER_BOUND);
}
if (config.hasPath(LOWER_BOUND)) {
this.lowerBound = config.getInt(LOWER_BOUND);
}
if (config.hasPath(SPLIT_COLUMN)) {
this.splitKey = config.getString(SPLIT_COLUMN);
}
if (config.hasPath(DATABASES)) {
this.database = config.getString(DATABASES);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.influxdb.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.List;

@Setter
@Getter
@ToString
public class SinkConfig extends InfluxDBConfig{
public SinkConfig(Config config) {
super(config);
}

private static final String KEY_TIME = "key_time";
private static final String KEY_TAGS = "key_tags";
public static final String KEY_MEASUREMENT = "measurement";

private static final String BATCH_SIZE = "batch_size";
private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
private static final String MAX_RETRIES = "max_retries";
private static final String WRITE_TIMEOUT = "write_timeout";
private static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
private static final String RETENTION_POLICY = "rp";
private static final int DEFAULT_BATCH_SIZE = 1024;
private static final int DEFAULT_WRITE_TIMEOUT = 5;
private static final TimePrecision DEFAULT_TIME_PRECISION = TimePrecision.NS;

private String rp;
private String measurement;
private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
private String keyTime;
private List<String> keyTags;
private int batchSize = DEFAULT_BATCH_SIZE;
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;

public static SinkConfig loadConfig(Config config) {
SinkConfig sinkConfig = new SinkConfig(config);

if (config.hasPath(KEY_TIME)) {
sinkConfig.setKeyTime(config.getString(KEY_TIME));
}
if (config.hasPath(KEY_TAGS)) {
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
}
if (config.hasPath(BATCH_INTERVAL_MS)) {
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
}
if (config.hasPath(MAX_RETRIES)) {
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
}
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
}
if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
}
if (config.hasPath(WRITE_TIMEOUT)) {
sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
}
if (config.hasPath(RETENTION_POLICY)) {
sinkConfig.setRp(config.getString(RETENTION_POLICY));
}
if (config.hasPath(EPOCH)) {
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
}
sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
return sinkConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.influxdb.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;

import java.util.List;

@Getter
public class SourceConfig extends InfluxDBConfig{
public static final String SQL = "sql";
public static final String SQL_WHERE = "where";
public static final String SPLIT_COLUMN = "split_column";
private static final String PARTITION_NUM = "partition_num";
private static final String UPPER_BOUND = "upper_bound";
private static final String LOWER_BOUND = "lower_bound";
public static final String DEFAULT_PARTITIONS = "0";
private String sql;
private int partitionNum = 0;
private String splitKey;
private long lowerBound;
private long upperBound;

List<Integer> columnsIndex;

public SourceConfig(Config config) {
super(config);
}

public static SourceConfig loadConfig(Config config) {
SourceConfig sourceConfig = new SourceConfig(config);

sourceConfig.sql = config.getString(SQL);

if (config.hasPath(PARTITION_NUM)) {
sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
}
if (config.hasPath(UPPER_BOUND)) {
sourceConfig.upperBound = config.getInt(UPPER_BOUND);
}
if (config.hasPath(LOWER_BOUND)) {
sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
}
if (config.hasPath(SPLIT_COLUMN)) {
sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
}
return sourceConfig;
}

}
Loading

0 comments on commit 630e884

Please sign in to comment.