From 815c941493b27153e269027f8096ac959dd0e9d6 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Mon, 18 Dec 2017 16:32:35 -0800 Subject: [PATCH 1/3] Added avro producer for data gen for the new schema registry integration. --- .../confluent/ksql/datagen/AvroProducer.java | 50 +++++++++++++++++++ .../io/confluent/ksql/datagen/DataGen.java | 20 ++++++-- 2 files changed, 67 insertions(+), 3 deletions(-) create mode 100644 ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java new file mode 100644 index 000000000000..30e6b2dbec27 --- /dev/null +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java @@ -0,0 +1,50 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + + +package io.confluent.ksql.datagen; + +import org.apache.avro.Schema; +import org.apache.kafka.common.serialization.Serializer; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.serde.avro.KsqlGenericRowAvroSerializer; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; + +public class AvroProducer extends DataGenProducer { + + private final KsqlConfig ksqlConfig; + private final SchemaRegistryClient schemaRegistryClient; + + public AvroProducer(KsqlConfig ksqlConfig) { + if (ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) == null) { + throw new KsqlException("Schema registry url is not set."); + } + this.ksqlConfig = ksqlConfig; + this.schemaRegistryClient = new CachedSchemaRegistryClient(ksqlConfig.getString(KsqlConfig + .SCHEMA_REGISTRY_URL_PROPERTY), 100); + } + + @Override + protected Serializer getSerializer(Schema avroSchema, + org.apache.kafka.connect.data.Schema kafkaSchema, + String topicName) { + return new KsqlGenericRowAvroSerializer(kafkaSchema, schemaRegistryClient, ksqlConfig, false); + } +} diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index e2fa51896159..972093e806e5 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -17,11 +17,13 @@ package io.confluent.ksql.datagen; import io.confluent.avro.random.generator.Generator; +import io.confluent.ksql.util.KsqlConfig; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -58,6 +60,10 @@ public static void main(String[] args) { DataGenProducer dataProducer; switch (arguments.format) { + case AVRO: + dataProducer = new AvroProducer( + new KsqlConfig(Collections.singletonMap(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, arguments.schemaregistryurl))); + break; case JSON: dataProducer = new JsonProducer(); break; @@ -110,6 +116,7 @@ public enum Format { AVRO, JSON, DELIMITED } public final String keyName; public final int iterations; public final long maxInterval; + public final String schemaregistryurl; public Arguments( boolean help, @@ -119,7 +126,8 @@ public Arguments( String topicName, String keyName, int iterations, - long maxInterval + long maxInterval, + String schemaregistryurl ) { this.help = help; this.bootstrapServer = bootstrapServer; @@ -129,6 +137,7 @@ public Arguments( this.keyName = keyName; this.iterations = iterations; this.maxInterval = maxInterval; + this.schemaregistryurl = schemaregistryurl; } public static class ArgumentParseException extends RuntimeException { @@ -148,6 +157,7 @@ public static class Builder { private String keyName; private int iterations; private long maxInterval; + private String schemaregistryurl; public Builder() { quickstart = null; @@ -159,6 +169,7 @@ public Builder() { keyName = null; iterations = 1000000; maxInterval = -1; + schemaregistryurl = "http://localhost:8081"; } private enum Quickstart { @@ -201,7 +212,7 @@ public Format getFormat() { public Arguments build() { if (help) { - return new Arguments(true, null, null, null, null, null, 0, -1); + return new Arguments(true, null, null, null, null, null, 0, -1, null); } if (quickstart != null) { @@ -220,7 +231,7 @@ public Arguments build() { throw new ArgumentParseException(exception.getMessage()); } return new Arguments(help, bootstrapServer, schemaFile, format, topicName, keyName, - iterations, maxInterval); + iterations, maxInterval, schemaregistryurl); } public Builder parseArgs(String[] args) throws IOException { @@ -296,6 +307,9 @@ public Builder parseArg(String arg) throws IOException { case "maxInterval": maxInterval = parseIterations(argValue); break; + case "schemaregistryurl": + schemaregistryurl = schemaregistryurl; + break; default: throw new ArgumentParseException(String.format( "Unknown argument name in '%s'", From 4d011108021930c416ffe8404b033b5d7a824b05 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Mon, 18 Dec 2017 16:48:09 -0800 Subject: [PATCH 2/3] Fixed a minor bug. --- .../src/main/java/io/confluent/ksql/datagen/DataGen.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index 972093e806e5..f41f7268621e 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -308,7 +308,7 @@ public Builder parseArg(String arg) throws IOException { maxInterval = parseIterations(argValue); break; case "schemaregistryurl": - schemaregistryurl = schemaregistryurl; + schemaregistryurl = argValue; break; default: throw new ArgumentParseException(String.format( From 62de2f2cabc174de9b8c11aa44274c4858365869 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Mon, 18 Dec 2017 19:04:15 -0800 Subject: [PATCH 3/3] Renamed variable name to be consistent with our styling. --- .../io/confluent/ksql/datagen/DataGen.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index f41f7268621e..d98a65b8b671 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -62,7 +62,7 @@ public static void main(String[] args) { switch (arguments.format) { case AVRO: dataProducer = new AvroProducer( - new KsqlConfig(Collections.singletonMap(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, arguments.schemaregistryurl))); + new KsqlConfig(Collections.singletonMap(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, arguments.schemaRegistryUrl))); break; case JSON: dataProducer = new JsonProducer(); @@ -116,7 +116,7 @@ public enum Format { AVRO, JSON, DELIMITED } public final String keyName; public final int iterations; public final long maxInterval; - public final String schemaregistryurl; + public final String schemaRegistryUrl; public Arguments( boolean help, @@ -127,7 +127,7 @@ public Arguments( String keyName, int iterations, long maxInterval, - String schemaregistryurl + String schemaRegistryUrl ) { this.help = help; this.bootstrapServer = bootstrapServer; @@ -137,7 +137,7 @@ public Arguments( this.keyName = keyName; this.iterations = iterations; this.maxInterval = maxInterval; - this.schemaregistryurl = schemaregistryurl; + this.schemaRegistryUrl = schemaRegistryUrl; } public static class ArgumentParseException extends RuntimeException { @@ -157,7 +157,7 @@ public static class Builder { private String keyName; private int iterations; private long maxInterval; - private String schemaregistryurl; + private String schemaRegistryUrl; public Builder() { quickstart = null; @@ -169,7 +169,7 @@ public Builder() { keyName = null; iterations = 1000000; maxInterval = -1; - schemaregistryurl = "http://localhost:8081"; + schemaRegistryUrl = "http://localhost:8081"; } private enum Quickstart { @@ -231,7 +231,7 @@ public Arguments build() { throw new ArgumentParseException(exception.getMessage()); } return new Arguments(help, bootstrapServer, schemaFile, format, topicName, keyName, - iterations, maxInterval, schemaregistryurl); + iterations, maxInterval, schemaRegistryUrl); } public Builder parseArgs(String[] args) throws IOException { @@ -307,8 +307,8 @@ public Builder parseArg(String arg) throws IOException { case "maxInterval": maxInterval = parseIterations(argValue); break; - case "schemaregistryurl": - schemaregistryurl = argValue; + case "schemaRegistryUrl": + schemaRegistryUrl = argValue; break; default: throw new ArgumentParseException(String.format(