Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] influxdb sink connector #3174

Merged
merged 24 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions docs/en/connector-v2/sink/InfluxDB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# InfluxDB

> InfluxDB sink connector

## Description

Write data to InfluxDB.

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|----------|----------|-------------------------------|
| url | string | yes | - |
| database | string | yes | |
| measurement | string | yes | |
| username | string | no | - |
| password | string | no | - |
| key_time | string | yes | processing time |
| key_tags | array | no | exclude `field` & `key_time` |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| connect_timeout_ms | long | no | 15000 |

### url
the url to connect to influxDB e.g.
```
http://influxdb-host:8086
```

### database [string]

The name of `influxDB` database

### measurement [string]

The name of `influxDB` measurement

### username [string]

`influxDB` user username

### password [string]

`influxDB` user password

### key_time [string]

Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

### key_tags [array]

Specify field-name of the `influxDB` measurement tags in SeaTunnelRow.
If not specified, include all fields with `influxDB` measurement field

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `influxDB`

### connect_timeout_ms [long]
the timeout for connecting to InfluxDB, in milliseconds

## Examples
```hocon
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

```

## Changelog

### next version

- Add InfluxDB Sink Connector
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.sink.InfluxDB = connector-influxdb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.client;

import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -75,4 +76,18 @@ public Response intercept(Chain chain) throws IOException {
log.info("connect influxdb successful. sever version :{}.", version);
return influxDB;
}

public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) {
String rp = sinkConfig.getRp();
if (!StringUtils.isEmpty(rp)) {
influxDB.setRetentionPolicy(rp);
}
}

public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException {
InfluxDB influxDB = getInfluxDB(sinkConfig);
influxDB.setDatabase(sinkConfig.getDatabase());
setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
return influxDB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class InfluxDBConfig implements Serializable {
Expand All @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable {
public static final String URL = "url";
private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";

public static final String SQL = "sql";
public static final String SQL_WHERE = "where";

public static final String DATABASES = "database";
public static final String SPLIT_COLUMN = "split_column";
private static final String PARTITION_NUM = "partition_num";
private static final String UPPER_BOUND = "upper_bound";
private static final String LOWER_BOUND = "lower_bound";


private static final String DEFAULT_FORMAT = "MSGPACK";
private static final String EPOCH = "epoch";

public static final String DEFAULT_PARTITIONS = "0";
protected static final String EPOCH = "epoch";
private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;

private static final String DEFAULT_EPOCH = "n";

private String url;
private String username;
private String password;
private String sql;
private int partitionNum = 0;
private String splitKey;
private long lowerBound;
private long upperBound;
private String database;

private String format = DEFAULT_FORMAT;
Expand All @@ -69,30 +50,15 @@ public class InfluxDBConfig implements Serializable {

private String epoch = DEFAULT_EPOCH;

List<Integer> columnsIndex;

public InfluxDBConfig(Config config) {
this.url = config.getString(URL);
this.sql = config.getString(SQL);

if (config.hasPath(USERNAME)) {
this.username = config.getString(USERNAME);
}
if (config.hasPath(PASSWORD)) {
this.password = config.getString(PASSWORD);
}
if (config.hasPath(PARTITION_NUM)) {
this.partitionNum = config.getInt(PARTITION_NUM);
}
if (config.hasPath(UPPER_BOUND)) {
this.upperBound = config.getInt(UPPER_BOUND);
}
if (config.hasPath(LOWER_BOUND)) {
this.lowerBound = config.getInt(LOWER_BOUND);
}
if (config.hasPath(SPLIT_COLUMN)) {
this.splitKey = config.getString(SPLIT_COLUMN);
}
if (config.hasPath(DATABASES)) {
this.database = config.getString(DATABASES);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.influxdb.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.List;

@Setter
@Getter
@ToString
public class SinkConfig extends InfluxDBConfig{
public SinkConfig(Config config) {
super(config);
}

private static final String KEY_TIME = "key_time";
private static final String KEY_TAGS = "key_tags";
public static final String KEY_MEASUREMENT = "measurement";

private static final String BATCH_SIZE = "batch_size";
private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
private static final String MAX_RETRIES = "max_retries";
private static final String WRITE_TIMEOUT = "write_timeout";
private static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
private static final String RETENTION_POLICY = "rp";
private static final int DEFAULT_BATCH_SIZE = 1024;
private static final int DEFAULT_WRITE_TIMEOUT = 5;
private static final TimePrecision DEFAULT_TIME_PRECISION = TimePrecision.NS;

private String rp;
private String measurement;
private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
private String keyTime;
private List<String> keyTags;
private int batchSize = DEFAULT_BATCH_SIZE;
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;

public static SinkConfig loadConfig(Config config) {
SinkConfig sinkConfig = new SinkConfig(config);

if (config.hasPath(KEY_TIME)) {
sinkConfig.setKeyTime(config.getString(KEY_TIME));
}
if (config.hasPath(KEY_TAGS)) {
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
}
if (config.hasPath(BATCH_INTERVAL_MS)) {
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
}
if (config.hasPath(MAX_RETRIES)) {
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
}
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
}
if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
}
if (config.hasPath(WRITE_TIMEOUT)) {
sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
}
if (config.hasPath(RETENTION_POLICY)) {
sinkConfig.setRp(config.getString(RETENTION_POLICY));
}
if (config.hasPath(EPOCH)) {
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
}
sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
return sinkConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.influxdb.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;

import java.util.List;

@Getter
public class SourceConfig extends InfluxDBConfig{
public static final String SQL = "sql";
public static final String SQL_WHERE = "where";
public static final String SPLIT_COLUMN = "split_column";
private static final String PARTITION_NUM = "partition_num";
private static final String UPPER_BOUND = "upper_bound";
private static final String LOWER_BOUND = "lower_bound";
public static final String DEFAULT_PARTITIONS = "0";
private String sql;
private int partitionNum = 0;
private String splitKey;
private long lowerBound;
private long upperBound;

List<Integer> columnsIndex;

public SourceConfig(Config config) {
super(config);
}

public static SourceConfig loadConfig(Config config) {
SourceConfig sourceConfig = new SourceConfig(config);

sourceConfig.sql = config.getString(SQL);

if (config.hasPath(PARTITION_NUM)) {
sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
}
if (config.hasPath(UPPER_BOUND)) {
sourceConfig.upperBound = config.getInt(UPPER_BOUND);
}
if (config.hasPath(LOWER_BOUND)) {
sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
}
if (config.hasPath(SPLIT_COLUMN)) {
sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
}
return sourceConfig;
}

}
Loading