From 42e827983753fbef5fa4cba0068705defb46f248 Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Fri, 22 May 2020 15:42:34 -0500 Subject: [PATCH] Initial (#2) * Initial commit. * Changed to kafka-connect-transform-opentsdb --- Jenkinsfile.example => Jenkinsfile | 0 README.md | 62 ++++++++- pom.xml | 14 +- .../BaseConnectorConfig.json | 61 --------- .../ExampleSinkConnectorConfig.json | 24 ---- .../ExampleSourceConnectorConfig.json | 24 ---- .../ExampleTransformationConfig.json | 61 --------- .../connect/example/ExampleSinkConnector.java | 68 ---------- .../connect/example/ExampleSinkTask.java | 50 -------- .../example/ExampleSourceConnector.java | 68 ---------- .../connect/example/ExampleSourceTask.java | 48 ------- .../connect/opentsdb/OpenTSDBParser.java | 120 ++++++++++++++++++ .../ParseOpenTSDB.java} | 47 ++++--- .../{example => opentsdb}/package-info.java | 12 +- .../connect/example/DocumentationTest.java | 7 - .../connect/opentsdb/DocumentationTest.java | 23 ++++ .../connect/opentsdb/ParseOpenTSDBTest.java | 111 ++++++++++++++++ .../opentsdb/ParseOpenTSDB/example.json | 25 ++++ src/test/resources/logback.xml | 2 +- 19 files changed, 379 insertions(+), 448 deletions(-) rename Jenkinsfile.example => Jenkinsfile (100%) delete mode 100644 src/main/connect-config-classes/BaseConnectorConfig.json delete mode 100644 src/main/connect-config-classes/ExampleSinkConnectorConfig.json delete mode 100644 src/main/connect-config-classes/ExampleSourceConnectorConfig.json delete mode 100644 src/main/connect-config-classes/ExampleTransformationConfig.json delete mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java delete mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java delete mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java delete mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/OpenTSDBParser.java rename src/main/java/com/github/jcustenborder/kafka/connect/{example/ExampleTransformation.java => opentsdb/ParseOpenTSDB.java} (58%) rename src/main/java/com/github/jcustenborder/kafka/connect/{example => opentsdb}/package-info.java (70%) delete mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/example/DocumentationTest.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/DocumentationTest.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDBTest.java create mode 100644 src/test/resources/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB/example.json diff --git a/Jenkinsfile.example b/Jenkinsfile similarity index 100% rename from Jenkinsfile.example rename to Jenkinsfile diff --git a/README.md b/README.md index d95e94b..00d33d8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,66 @@ # Introduction +[Documentation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-opentsdb) | [Confluent Hub](https://www.confluent.io/hub/jcustenborder/kafka-connect-transform-opentsdb) -This is a template repository for creating Kafka Connect Plugins. +The plugin provides a mechanism to parse the wire format for OpenTSDB. +# Installation +## Confluent Hub +The following command can be used to install the plugin directly from the Confluent Hub using the +[Confluent Hub Client](https://docs.confluent.io/current/connect/managing/confluent-hub/client.html). + +```bash +confluent-hub install jcustenborder/kafka-connect-transform-opentsdb:latest +``` + +## Manually + +The zip file that is deployed to the [Confluent Hub](https://www.confluent.io/hub/jcustenborder/kafka-connect-transform-opentsdb) is available under +`target/components/packages/`. You can manually extract this zip file which includes all dependencies. All the dependencies +that are required to deploy the plugin are under `target/kafka-connect-target` as well. Make sure that you include all the dependencies that are required +to run the plugin. + +1. Create a directory under the `plugin.path` on your Connect worker. +2. Copy all of the dependencies under the newly created subdirectory. +3. Restart the Connect worker. + + + + +# Transformations +## [Parse OpenTSDB transformation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-opentsdb/transformations/ParseOpenTSDB.html) + +*Key* +``` +com.github.jcustenborder.kafka.connect.opentsdb.ParseOpenTSDB$Key +``` +*Value* +``` +com.github.jcustenborder.kafka.connect.opentsdb.ParseOpenTSDB$Value +``` + +The ParseOpenTSDB transformation will parse data that is formatted with the OpenTSDB wire protocol. +### Tip + +This transformation expects data to be a String. You are most likely going to use the StringConverter. +### Configuration + + + + +# Development + +## Building the source + +```bash +mvn clean package +``` + +## Contributions + +Contributions are always welcomed! Before you start any development please create an issue and +start a discussion. Create a pull request against your newly created issue and we're happy to see +if we can merge your pull request. First and foremost any time you're adding code to the code base +you need to include test coverage. Make sure that you run `mvn clean package` before submitting your +pull to ensure that all of the tests, checkstyle rules, and the package can be successfully built. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4827f13..d8ae5a2 100644 --- a/pom.xml +++ b/pom.xml @@ -26,11 +26,11 @@ kafka-connect-parent 2.4.0 - kafka-connect-example + kafka-connect-opentsdb 0.0.2-SNAPSHOT - kafka-connect-example + kafka-connect-opentsdb A Kafka Connect connector receiving data from example. - https://github.com/jcustenborder/kafka-connect-example + https://github.com/jcustenborder/kafka-connect-opentsdb 2019 @@ -50,14 +50,14 @@ - scm:git:https://github.com/jcustenborder/kafka-connect-example.git - scm:git:git@github.com:jcustenborder/kafka-connect-example.git + scm:git:https://github.com/jcustenborder/kafka-connect-opentsdb.git + scm:git:git@github.com:jcustenborder/kafka-connect-opentsdb.git - https://github.com/jcustenborder/kafka-connect-example + https://github.com/jcustenborder/kafka-connect-opentsdb github - https://github.com/jcustenborder/kafka-connect-example/issues + https://github.com/jcustenborder/kafka-connect-opentsdb/issues diff --git a/src/main/connect-config-classes/BaseConnectorConfig.json b/src/main/connect-config-classes/BaseConnectorConfig.json deleted file mode 100644 index 2bb1fb2..0000000 --- a/src/main/connect-config-classes/BaseConnectorConfig.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "example.", - "groups": [ - { - "name": "SESSION", - "display": "Session Configuration", - "prefix": "session.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - }, - { - "name": "SSL", - "display": "SSL Configuration", - "prefix": "ssl.", - "configItems": [ - { - "configKey": "keystore.path", - "type": "STRING", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "keystore.password", - "type": "PASSWORD", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM" - }, - { - "configKey": "truststore.path", - "type": "STRING", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "truststore.password", - "type": "PASSWORD", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "changeit" - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleSinkConnectorConfig.json b/src/main/connect-config-classes/ExampleSinkConnectorConfig.json deleted file mode 100644 index ab3677c..0000000 --- a/src/main/connect-config-classes/ExampleSinkConnectorConfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleSinkConnectorConfig", - "extends": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "myconn.", - "groups": [ - { - "name": "TARGET", - "display": "Target Configuration", - "prefix": "target.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleSourceConnectorConfig.json b/src/main/connect-config-classes/ExampleSourceConnectorConfig.json deleted file mode 100644 index a178aa7..0000000 --- a/src/main/connect-config-classes/ExampleSourceConnectorConfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleSourceConnectorConfig", - "extends": "com.github.jcustenborder.kafka.connect.example.BaseConnectorConfig", - "prefix": "myconn.", - "groups": [ - { - "name": "TARGET", - "display": "Target Configuration", - "prefix": "target.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/connect-config-classes/ExampleTransformationConfig.json b/src/main/connect-config-classes/ExampleTransformationConfig.json deleted file mode 100644 index 15ae5f3..0000000 --- a/src/main/connect-config-classes/ExampleTransformationConfig.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "com.github.jcustenborder.kafka.connect.example.ExampleTransformationConfig", - "prefix": "example.", - "groups": [ - { - "name": "SESSION", - "display": "Session Configuration", - "prefix": "session.", - "configItems": [ - { - "configKey": "servers", - "type": "LIST", - "documentation": "Server(s) to connect to.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": [ - "http://localhost:8081" - ] - } - ] - }, - { - "name": "SSL", - "display": "SSL Configuration", - "prefix": "ssl.", - "configItems": [ - { - "configKey": "keystore.path", - "type": "STRING", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "keystore.password", - "type": "PASSWORD", - "documentation": "Location of the Java keystore to use.", - "importance": "HIGH", - "width": "MEDIUM" - }, - { - "configKey": "truststore.path", - "type": "STRING", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "" - }, - { - "configKey": "truststore.password", - "type": "PASSWORD", - "documentation": "Location of the Java truststore to use.", - "importance": "HIGH", - "width": "MEDIUM", - "defaultValue": "changeit" - } - ] - } - ] -} \ No newline at end of file diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java deleted file mode 100644 index b811770..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkConnector.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.config.TaskConfigs; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.sink.SinkConnector; - -import java.util.List; -import java.util.Map; - -@Description("This is the description of the connector.") -public class ExampleSinkConnector extends SinkConnector { - Map settings; - - @Override - public void start(Map settings) { - ExampleSinkConnectorConfig config = new ExampleSinkConnectorConfig(settings); - - /** - * Do whatever you need to do to setup your connector on a global scale. This is something that - * will execute once per connector instance. - */ - - this.settings = settings; - } - - @Override - public Class taskClass() { - return ExampleSinkTask.class; - } - - @Override - public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return ExampleSinkConnectorConfig.config(ExampleSinkConnectorConfig.DEFAULT_CONFIG_OPTIONS); - } - - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java deleted file mode 100644 index 26fe51b..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSinkTask.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTask; - -import java.util.Collection; -import java.util.Map; - -public class ExampleSinkTask extends SinkTask { - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } - - ExampleSinkConnectorConfig config; - - @Override - public void start(Map settings) { - this.config = new ExampleSinkConnectorConfig(settings); - /** - * Do whatever you need to do to setup each of the tasks that the connector launches. - */ - } - - @Override - public void put(Collection records) { - - } - - @Override - public void stop() { - - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java deleted file mode 100644 index ff1a75f..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceConnector.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import com.github.jcustenborder.kafka.connect.utils.config.Description; -import com.github.jcustenborder.kafka.connect.utils.config.TaskConfigs; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.source.SourceConnector; - -import java.util.List; -import java.util.Map; - -@Description("This connector is used to pull data from a source system and write it " + - "to Kafka.") -public class ExampleSourceConnector extends SourceConnector { - - Map settings; - - @Override - public void start(Map settings) { - ExampleSourceConnectorConfig config = new ExampleSourceConnectorConfig(settings); - /** - * Do whatever you need to do to setup your connector on a global scale. This is something that - * will execute once per connector instance. - */ - this.settings = settings; - } - - @Override - public Class taskClass() { - return ExampleSourceTask.class; - } - - @Override - public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return ExampleSourceConnectorConfig.config(ExampleSourceConnectorConfig.DEFAULT_CONFIG_OPTIONS); - } - - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java deleted file mode 100644 index a8d2436..0000000 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleSourceTask.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.source.SourceTask; - -import java.util.List; -import java.util.Map; - -public class ExampleSourceTask extends SourceTask { - @Override - public String version() { - return VersionUtil.version(this.getClass()); - } - - ExampleSourceConnectorConfig config; - - @Override - public void start(Map settings) { - this.config = new ExampleSourceConnectorConfig(settings); - } - - @Override - public List poll() throws InterruptedException { - - return null; - } - - @Override - public void stop() { - - } -} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/OpenTSDBParser.java b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/OpenTSDBParser.java new file mode 100644 index 0000000..31bd967 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/OpenTSDBParser.java @@ -0,0 +1,120 @@ +/** + * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.opentsdb; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class OpenTSDBParser { + private static final Logger log = LoggerFactory.getLogger(OpenTSDBParser.class); + + public static final Schema SCHEMA = SchemaBuilder.struct() + .name("net.opentsdb.model.DataPoint") + .field("metricName", SchemaBuilder.string().doc("").build()) + .field("timestamp", Timestamp.builder().doc("").build()) + .field("value", SchemaBuilder.float64().optional().doc("").build()) + .field("tags", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).doc("").build()) + .build(); + + static final Pattern METRIC_PATTERN = Pattern.compile("^(\\S+)\\s+(\\d+)\\s+([0-9.]+)"); + static final Pattern KEY_VALUE_PATTERN = Pattern.compile("\\s*(\\S+)=(\\S+)"); + + public SchemaAndValue parse(String text) { + log.trace("parse() - text = '{}'", text); + if (null == text) { + return null; + } + Matcher matcher = METRIC_PATTERN.matcher(text); + boolean found = matcher.find(); + if (!found) { + throw new IllegalStateException(""); + } else { + log.trace("parse() - matches = {} start = {} end = {}", found, matcher.start(), matcher.end()); + } + int end = matcher.end(); + String metricName = matcher.group(1); + + Date timestamp; + try { + log.trace("parse() - parsing group 2 to long. text = '{}'", matcher.group(2)); + long timestampLong = Long.parseLong(matcher.group(2)); + if (timestampLong < 946684800000L) { + timestampLong = timestampLong * 1000L; + } + + //TODO: Check if seconds and adjust. + timestamp = new Date(timestampLong); + } catch (Exception ex) { + IllegalStateException exception = new IllegalStateException( + String.format( + "Could not parse '%s'. '%s'", + matcher.group(2), + text + ) + ); + exception.initCause(ex); + throw exception; + } + double value; + try { + log.trace("parse() - parsing group 3 to double. text = '{}'", matcher.group(3)); + value = Double.parseDouble(matcher.group(3)); + } catch (Exception ex) { + IllegalStateException exception = new IllegalStateException( + String.format( + "Could not parse '%s'. '%s'", + matcher.group(3), + text + ) + ); + exception.initCause(ex); + throw exception; + } + + log.trace("parse() - metricName = '{}' timestamp = '{}' value = '{}'", metricName, timestamp, value); + matcher = KEY_VALUE_PATTERN.matcher(text); + Map tags = new LinkedHashMap<>(10); + while (end < text.length()) { + if (!matcher.find(end)) { + break; + } + String tagKey = matcher.group(1); + String tagValue = matcher.group(2); + tags.put(tagKey, tagValue); + log.trace("put() - tagKey = '{}' tagValue = '{}'", tagKey, tagValue); + end = matcher.end(); + } + + Struct struct = new Struct(SCHEMA) + .put("metricName", metricName) + .put("value", value) + .put("timestamp", timestamp) + .put("tags", tags); + + return new SchemaAndValue(struct.schema(), struct); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB.java similarity index 58% rename from src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java rename to src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB.java index 8fe0349..6559083 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/ExampleTransformation.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB.java @@ -13,34 +13,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.jcustenborder.kafka.connect.example; +package com.github.jcustenborder.kafka.connect.opentsdb; import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip; +import com.github.jcustenborder.kafka.connect.utils.config.Title; import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.Struct; import java.util.Map; -@Description("This is an example transformation.") -public abstract class ExampleTransformation> extends BaseKeyValueTransformation { - protected ExampleTransformation(boolean isKey) { - super(isKey); - } - public static class Key> extends ExampleTransformation { - public Key() { - super(true); - } - } +@Title("Parse OpenTSDB transformation") +@Description("The ParseOpenTSDB transformation will parse data that is formatted with the OpenTSDB " + + "wire protocol.") +@DocumentationTip("This transformation expects data to be a String. You are " + + "most likely going to use the StringConverter.") +public class ParseOpenTSDB> extends BaseKeyValueTransformation { + OpenTSDBParser parser = new OpenTSDBParser(); - public static class Value> extends ExampleTransformation { - public Value() { - super(false); - } + protected ParseOpenTSDB(boolean isKey) { + super(isKey); } @Override @@ -54,13 +50,24 @@ public void close() { } @Override - public void configure(Map map) { + public void configure(Map configs) { } @Override - protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { - //TODO; Do something - return null; + protected SchemaAndValue processString(R record, Schema inputSchema, String input) { + return parser.parse(input); + } + + public static class Key> extends ParseOpenTSDB { + public Key() { + super(true); + } + } + + public static class Value> extends ParseOpenTSDB { + public Value() { + super(false); + } } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/package-info.java similarity index 70% rename from src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java rename to src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/package-info.java index 50f401b..ee69f94 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/example/package-info.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/opentsdb/package-info.java @@ -13,15 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Introduction("This is information about the connector.") -@Title("Title") -@DocumentationWarning("This is a warning") @PluginOwner("jcustenborder") -@PluginName("kafka-connect-example") -package com.github.jcustenborder.kafka.connect.example; +@PluginName("kafka-connect-transform-opentsdb") +@Introduction("The plugin provides a mechanism to parse the wire format for OpenTSDB.") +package com.github.jcustenborder.kafka.connect.opentsdb; -import com.github.jcustenborder.kafka.connect.utils.config.DocumentationWarning; import com.github.jcustenborder.kafka.connect.utils.config.Introduction; import com.github.jcustenborder.kafka.connect.utils.config.PluginName; -import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner; -import com.github.jcustenborder.kafka.connect.utils.config.Title; \ No newline at end of file +import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner; \ No newline at end of file diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/example/DocumentationTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/example/DocumentationTest.java deleted file mode 100644 index b90d62a..0000000 --- a/src/test/java/com/github/jcustenborder/kafka/connect/example/DocumentationTest.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.github.jcustenborder.kafka.connect.example; - -import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; - -public class DocumentationTest extends BaseDocumentationTest { - -} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/DocumentationTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/DocumentationTest.java new file mode 100644 index 0000000..c194cef --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/DocumentationTest.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.opentsdb; + + +import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; + +public class DocumentationTest extends BaseDocumentationTest { + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDBTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDBTest.java new file mode 100644 index 0000000..ec8b5f0 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDBTest.java @@ -0,0 +1,111 @@ +/** + * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.opentsdb; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +public class ParseOpenTSDBTest { + + ParseOpenTSDB.Value transformation; + + @BeforeEach + public void before() { + this.transformation = new ParseOpenTSDB.Value<>(); + this.transformation.configure(ImmutableMap.of()); + } + + @TestFactory + public Stream apply() { + Map testcases = new LinkedHashMap<>(); + testcases.put( + "mysql.bytes_received 1287333217 327810227706 schema=foo host=db1", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_received") + .put("timestamp", new Date(1287333217L * 1000L)) + .put("value", 327810227706D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db1")) + ); + testcases.put("mysql.bytes_sent 1287333217 6604859181710 schema=foo host=db1", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_sent") + .put("timestamp", new Date(1287333217L * 1000L)) + .put("value", 6604859181710D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db1")) + ); + testcases.put("mysql.bytes_received 1287333232 327812421706 schema=foo host=db1", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_received") + .put("timestamp", new Date(1287333232L * 1000L)) + .put("value", 327812421706D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db1")) + ); + testcases.put("mysql.bytes_sent 1287333232 6604901075387 schema=foo host=db1", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_sent") + .put("timestamp", new Date(1287333232L * 1000L)) + .put("value", 6604901075387D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db1")) + ); + testcases.put("mysql.bytes_received 1287333321 340899533915 schema=foo host=db2", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_received") + .put("timestamp", new Date(1287333321L * 1000L)) + .put("value", 340899533915D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db2")) + ); + testcases.put("mysql.bytes_sent 1287333321 5506469130707 schema=foo host=db2", + new Struct(OpenTSDBParser.SCHEMA) + .put("metricName", "mysql.bytes_sent") + .put("timestamp", new Date(1287333321L * 1000L)) + .put("value", 5506469130707D) + .put("tags", ImmutableMap.of("schema", "foo", "host", "db2")) + ); + + return testcases.entrySet().stream() + .map(e -> dynamicTest(e.getKey(), () -> { + final SinkRecord input = new SinkRecord( + "test", + 1, + null, + null, + Schema.STRING_SCHEMA, + e.getKey(), + 123412L + ); + final SinkRecord output = this.transformation.apply(input); + assertNotNull(output); + assertTrue(output.value() instanceof Struct, "output.value() should be a struct."); + assertStruct(e.getValue(), (Struct) output.value()); + })); + } + +} diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB/example.json b/src/test/resources/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB/example.json new file mode 100644 index 0000000..9ab4bf2 --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/opentsdb/ParseOpenTSDB/example.json @@ -0,0 +1,25 @@ +{ + "title" : "Example", + "input" : { + "topic" : "foo", + "kafkaPartition" : 1, + "keySchema" : { + "type" : "STRING", + "isOptional" : false + }, + "key" : "foo", + "valueSchema" : { + "type" : "STRING", + "isOptional" : false + }, + "value" : "mysql.bytes_received 1287333217 327810227706 schema=foo host=db1", + "timestamp" : 1530286549123, + "timestampType" : "CREATE_TIME", + "offset" : 91283741, + "headers" : [ ] + }, + "description" : "This data takes a string value and parses the data based on the OpenTSDB wire protocol.", + "name" : "Example", + "config" : {}, + "childClass" : "Value" +} \ No newline at end of file diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index df42d7d..a6a359e 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -4,7 +4,7 @@ %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n - +