From db384924bed7e6da7e83c1e08b91cb4c0a35e842 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Fri, 27 Oct 2017 11:20:29 -0700 Subject: [PATCH 1/7] Added self documentation to the KSQL config publis config variables. --- .../io/confluent/ksql/util/KsqlConfig.java | 92 +++++++++++-------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 89117eee38b3..68eb0ca73577 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -57,67 +57,83 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String KSQL_SERVICE_ID_CONFIG = "ksql.service.id"; - public static final ConfigDef.Type - KSQL_SERVICE_ID_TYPE = ConfigDef.Type.STRING; public static final String KSQL_SERVICE_ID_DEFAULT = "ksql_"; - public static final ConfigDef.Importance - KSQL_SERVICE_ID_IMPORTANCE = ConfigDef.Importance.MEDIUM; - public static final String - KSQL_SERVICE_ID_DOC = - "Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in " - + "this service."; public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix"; - public static final ConfigDef.Type - KSQL_PERSISTENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING; public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_"; - public static final ConfigDef.Importance - KSQL_PERSISTENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; - public static final String - KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC = - "Second part of the prefix for persitent queries."; public static final String KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix"; - public static final ConfigDef.Type - KSQL_TRANSIENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING; public static final String KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT = "transient_"; - public static final ConfigDef.Importance - KSQL_TRANSIENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; - public static final String - KSQL_TRANSIENT_QUERY_NAME_PREFIX_DOC = - "Second part of the prefix for transient queries."; public static final String KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG = "ksql.statestore.suffix"; - public static final ConfigDef.Type - KSQL_TABLE_STATESTORE_NAME_SUFFIX_TYPE = ConfigDef.Type.STRING; public static final String - KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "transient_"; - public static final ConfigDef.Importance - KSQL_TABLE_STATESTORE_NAME_SUFFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM; - public static final String - KSQL_TABLE_STATESTORE_NAME_SUFFIX_DOC = - "Suffix for state store names in Tables."; + KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore"; - public int defaultSinkNumberOfPartitions = 4; - public short defaultSinkNumberOfReplications = 1; + public static int defaultSinkNumberOfPartitions = 4; + public static short defaultSinkNumberOfReplications = 1; // TODO: Find out the best default value. - public long defaultSinkWindowChangeLogAdditionalRetention = 1000000; + public static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; - public String defaultAutoOffsetRestConfig = "latest"; - public long defaultCommitIntervalMsConfig = 2000; - public long defaultCacheMaxBytesBufferingConfig = 10000000; - public int defaultNumberOfStreamsThreads = 4; + public static String defaultAutoOffsetRestConfig = "latest"; + public static long defaultCommitIntervalMsConfig = 2000; + public static long defaultCacheMaxBytesBufferingConfig = 10000000; + public static int defaultNumberOfStreamsThreads = 4; Map ksqlConfigProps; Map ksqlStreamConfigProps; - private static final ConfigDef CONFIG_DEF = new ConfigDef(); + private static final ConfigDef CONFIG_DEF; + + static { + CONFIG_DEF = new ConfigDef() + .define(KSQL_SERVICE_ID_CONFIG, + ConfigDef.Type.STRING, + KSQL_SERVICE_ID_DEFAULT, + ConfigDef.Importance.MEDIUM, + "Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in " + + "this service.") + + .define(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, + ConfigDef.Type.STRING, + KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, + ConfigDef.Importance.MEDIUM, + "Second part of the prefix for persitent queries.") + .define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, + ConfigDef.Type.STRING, + KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT, + ConfigDef.Importance.MEDIUM, + "Second part of the prefix for transient queries.") + .define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, + ConfigDef.Type.STRING, + KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, + ConfigDef.Importance.MEDIUM, + "Suffix for state store names in Tables.") + .define(DEFAULT_SINK_NUMBER_OF_PARTITIONS, + ConfigDef.Type.INT, + defaultSinkNumberOfPartitions, + ConfigDef.Importance.MEDIUM, + "The default number of partitions for the topics created by KSQL.") + .define(DEFAULT_SINK_NUMBER_OF_REPLICATIONS, + ConfigDef.Type.SHORT, + defaultSinkNumberOfReplications, + ConfigDef.Importance.MEDIUM, + "The default number of replications for the topics created by KSQL." + ) + .define(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION, + ConfigDef.Type.LONG, + defaultSinkWindowChangeLogAdditionalRetention, + ConfigDef.Importance.MEDIUM, + "The default window change log additional retention time." + ) + ; + } + public KsqlConfig(Map props) { super(CONFIG_DEF, props); From 17de98e08d5e990ceadec7470421bd1fcf3df504 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Mon, 30 Oct 2017 13:05:08 -0700 Subject: [PATCH 2/7] Added examples to the descriptions. --- .../io/confluent/ksql/util/KsqlConfig.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 68eb0ca73577..bbfcc064567d 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -75,15 +75,15 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore"; - public static int defaultSinkNumberOfPartitions = 4; - public static short defaultSinkNumberOfReplications = 1; + private static int defaultSinkNumberOfPartitions = 4; + private static short defaultSinkNumberOfReplications = 1; // TODO: Find out the best default value. - public static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; + private static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; - public static String defaultAutoOffsetRestConfig = "latest"; - public static long defaultCommitIntervalMsConfig = 2000; - public static long defaultCacheMaxBytesBufferingConfig = 10000000; - public static int defaultNumberOfStreamsThreads = 4; + private static String defaultAutoOffsetRestConfig = "latest"; + private static long defaultCommitIntervalMsConfig = 2000; + private static long defaultCacheMaxBytesBufferingConfig = 10000000; + private static int defaultNumberOfStreamsThreads = 4; Map ksqlConfigProps; Map ksqlStreamConfigProps; @@ -103,17 +103,21 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Type.STRING, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.MEDIUM, - "Second part of the prefix for persitent queries.") + "Second part of the prefix for persitent queries. For instance if the prefix is transient_" + + "query_ the query name will be ksql_query_1.") .define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.MEDIUM, - "Second part of the prefix for transient queries.") + "Second part of the prefix for transient queries. For instance if the prefix is " + + "transient_ the query name would be ksql_transient_4120896722607083946_1509389010601") .define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, ConfigDef.Type.STRING, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, ConfigDef.Importance.MEDIUM, - "Suffix for state store names in Tables.") + "Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state " + + "store name would be ksql_query_1_ksql_statestore" + + "_ksql_statestore ") .define(DEFAULT_SINK_NUMBER_OF_PARTITIONS, ConfigDef.Type.INT, defaultSinkNumberOfPartitions, @@ -123,7 +127,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Type.SHORT, defaultSinkNumberOfReplications, ConfigDef.Importance.MEDIUM, - "The default number of replications for the topics created by KSQL." + "The default number of replicas for the topics created by KSQL." ) .define(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION, ConfigDef.Type.LONG, From b155a6546e1ace68936029c9426f845230b42921 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Wed, 1 Nov 2017 12:57:56 -0700 Subject: [PATCH 3/7] Renamed some KSQL config props and used ConfigDef for populating the props. --- .../test/java/io/confluent/ksql/CliTest.java | 2 +- .../io/confluent/ksql/util/KsqlConfig.java | 43 +++---------------- .../io/confluent/ksql/analyzer/Analyzer.java | 10 ++--- .../plan/KsqlStructuredDataOutputNode.java | 8 ++-- .../KsqlStructuredDataOutputNodeTest.java | 4 +- .../ksql/rest/server/KsqlRestApplication.java | 4 +- 6 files changed, 21 insertions(+), 50 deletions(-) diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index 6e0d6ae28534..95fc9928e1f7 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -163,7 +163,7 @@ private static Map validStartUpConfigs() { Map startConfigs = genDefaultConfigMap(); startConfigs.put("num.stream.threads", 4); - startConfigs.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, 1); + startConfigs.put(SINK_NUMBER_OF_REPLICAS_PROPERTY, 1); startConfigs.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4); startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, 1000000); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index bbfcc064567d..0efd8fc84313 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -36,18 +36,14 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS"; public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; - public static final String DEFAULT_SINK_NUMBER_OF_PARTITIONS = "ksql.sink.partitions.default"; - public static final String SINK_NUMBER_OF_REPLICATIONS = "REPLICATIONS"; - public static final String SINK_NUMBER_OF_REPLICATIONS_PROPERTY = "ksql.sink.replications"; - public static final String DEFAULT_SINK_NUMBER_OF_REPLICATIONS = "ksql.sink.replications.default"; + public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS"; + public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas"; public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = "WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION"; public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY = "ksql.sink.window.change.log.additional.retention"; - public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = - "ksql.sink.window.change.log.additional.retention.default"; public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog"; @@ -118,18 +114,18 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { "Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state " + "store name would be ksql_query_1_ksql_statestore" + "_ksql_statestore ") - .define(DEFAULT_SINK_NUMBER_OF_PARTITIONS, + .define(SINK_NUMBER_OF_PARTITIONS_PROPERTY, ConfigDef.Type.INT, defaultSinkNumberOfPartitions, ConfigDef.Importance.MEDIUM, "The default number of partitions for the topics created by KSQL.") - .define(DEFAULT_SINK_NUMBER_OF_REPLICATIONS, + .define(SINK_NUMBER_OF_REPLICAS_PROPERTY, ConfigDef.Type.SHORT, defaultSinkNumberOfReplications, ConfigDef.Importance.MEDIUM, "The default number of replicas for the topics created by KSQL." ) - .define(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION, + .define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, ConfigDef.Type.LONG, defaultSinkWindowChangeLogAdditionalRetention, ConfigDef.Importance.MEDIUM, @@ -144,32 +140,7 @@ public KsqlConfig(Map props) { ksqlConfigProps = new HashMap<>(); ksqlStreamConfigProps = new HashMap<>(); - ksqlConfigProps.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT); - ksqlConfigProps.put(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT); - ksqlConfigProps.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT); - ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT); - - if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) { - ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, - Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString())); - } else { - ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, defaultSinkNumberOfPartitions); - } - - if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) { - ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, - Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString())); - } else { - ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, defaultSinkNumberOfReplications); - } - - if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) { - ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, - Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString())); - } else { - ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, - defaultSinkWindowChangeLogAdditionalRetention); - } + ksqlConfigProps.putAll(super.values()); ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig); ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig); @@ -177,7 +148,7 @@ public KsqlConfig(Map props) { StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig); ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads); - for (Map.Entry entry : props.entrySet()) { + for (Map.Entry entry : originals().entrySet()) { final String key = entry.getKey().toString(); if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) { ksqlConfigProps.put(key, entry.getValue()); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index be4b365f2c91..0a4d48cca42f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -466,16 +466,16 @@ private void setIntoProperties(final StructuredDataSource into, final Table node } } - if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS) != null) { + if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) != null) { try { short numberOfReplications = - Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS) + Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) .toString()); analysis.getIntoProperties() - .put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, numberOfReplications); + .put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications); } catch (NumberFormatException e) { throw new KsqlException("Invalid number of replications in WITH clause: " + node - .getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS).toString()); + .getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString()); } } } @@ -541,7 +541,7 @@ private void validateWithClause(Set withClauseVariables) { validSet.add(DdlConfig.PARTITION_BY_PROPERTY.toUpperCase()); validSet.add(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME.toUpperCase()); validSet.add(KsqlConfig.SINK_NUMBER_OF_PARTITIONS.toUpperCase()); - validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS.toUpperCase()); + validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICAS.toUpperCase()); for (String withVariable: withClauseVariables) { if (!validSet.contains(withVariable.toUpperCase())) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java index 2ba3e4d42e10..bb996941cf3f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java @@ -110,9 +110,9 @@ public SchemaKStream buildStream(final StreamsBuilder builder, ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, outputProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)); } - if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY)) { - ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, - outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY + if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)) { + ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, + outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY )); } @@ -181,7 +181,7 @@ private void addAvroSchemaToResultTopic(final KsqlStructuredDataOutputNode.Build private void createSinkTopic(final String kafkaTopicName, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient) { int numberOfPartitions = (Integer) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY); - short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY); + short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY); kafkaTopicClient.createTopic(kafkaTopicName, numberOfPartitions, numberOfReplications); } public Field getTimestampField() { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index d1b7eaa5da36..cfd865a71c31 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -90,7 +90,7 @@ public class KsqlStructuredDataOutputNodeTest { public void before() { final Map props = new HashMap<>(); props.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4); - props.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, (short)3); + props.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short)3); createOutputNode(props); topicClient.createTopic(eq("output"), anyInt(), anyShort()); EasyMock.expectLastCall(); @@ -156,7 +156,7 @@ public void shouldHaveCorrectOutputNodeSchema() { @Test public void shouldUpdateReplicationPartitionsInConfig() { - assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY), equalTo(Integer.valueOf(3).shortValue())); + assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo(Integer.valueOf(3).shortValue())); } @Test diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 6d1842ce2793..eec86ce2c495 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -218,9 +218,9 @@ public static KsqlRestApplication buildApplication(KsqlRestConfig restConfig, bo try { short replicationFactor = 1; - if(restConfig.getOriginals().containsKey(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) { + if(restConfig.getOriginals().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS)) { replicationFactor = Short.parseShort(restConfig.getOriginals() - .get(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()); + .get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString()); } client.createTopic(commandTopic, 1, replicationFactor); } catch (KafkaTopicException e) { From 02a2f32a9207c3065bef41ee5f105e284d7de6e1 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Thu, 2 Nov 2017 13:39:39 -0700 Subject: [PATCH 4/7] Only update ksqlStreamConfigProps since ksqlConfigProps is populated via values. Correct the CLI test to expect the correct properties. --- .../test/java/io/confluent/ksql/CliTest.java | 2 - .../io/confluent/ksql/util/KsqlConfig.java | 4 +- .../confluent/ksql/util/KsqlConfigTest.java | 45 +++++++++++++++++++ 3 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index 95fc9928e1f7..e003834a6850 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -147,8 +147,6 @@ private static Map genDefaultConfigMap() { configMap.put("commit.interval.ms", 0); configMap.put("cache.max.bytes.buffering", 0); configMap.put("auto.offset.reset", "earliest"); - configMap.put("ksql.command.topic.suffix", "commands"); - return configMap; } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 0efd8fc84313..52e9afe469e7 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -150,9 +150,7 @@ public KsqlConfig(Map props) { for (Map.Entry entry : originals().entrySet()) { final String key = entry.getKey().toString(); - if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) { - ksqlConfigProps.put(key, entry.getValue()); - } else { + if (!key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) { ksqlStreamConfigProps.put(key, entry.getValue()); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java new file mode 100644 index 000000000000..2da0f8633ecd --- /dev/null +++ b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java @@ -0,0 +1,45 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.ksql.util; + +import org.apache.kafka.streams.StreamsConfig; +import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.HashMap; +import java.util.Map; + +public class KsqlConfigTest { + + @Test + public void shouldSetInitialValuesCorrectly() { + Map initialProps = new HashMap<>(); + initialProps.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 10); + initialProps.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 3); + initialProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 800); + initialProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); + + KsqlConfig ksqlConfig = new KsqlConfig(initialProps); + + assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY), equalTo(10)); + assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo((short) 3)); + + } + + +} From b3438c62ee7f8d1f5d26de6427a7006a549a7a1f Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Fri, 3 Nov 2017 09:39:10 -0700 Subject: [PATCH 5/7] More details on a config value doc. --- ksql-cli/src/test/java/io/confluent/ksql/CliTest.java | 3 +-- .../main/java/io/confluent/ksql/util/KsqlConfig.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index e003834a6850..ddc31e9f3f42 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -163,7 +162,7 @@ private static Map validStartUpConfigs() { startConfigs.put(SINK_NUMBER_OF_REPLICAS_PROPERTY, 1); startConfigs.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4); - startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, 1000000); + startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, 1000000); startConfigs.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT); startConfigs.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 52e9afe469e7..e990e27f6655 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -42,7 +42,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = "WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION"; - public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY = + public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY = "ksql.sink.window.change.log.additional.retention"; public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog"; @@ -125,11 +125,16 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Importance.MEDIUM, "The default number of replicas for the topics created by KSQL." ) - .define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, + .define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, ConfigDef.Type.LONG, defaultSinkWindowChangeLogAdditionalRetention, ConfigDef.Importance.MEDIUM, - "The default window change log additional retention time." + "The default window change log additional retention time. This is a streams " + + "config value which will be added to a windows maintainMs to ensure data is not " + + "deleted from " + + "the " + + "log " + + "prematurely. Allows for clock drift. Default is 1 day" ) ; } From 7026f77950edb2163009150ffcfd692ab78280f8 Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Mon, 6 Nov 2017 13:03:12 -0800 Subject: [PATCH 6/7] Created a new KsqlConstants file to hold constants such as property names. --- .../io/confluent/ksql/util/KsqlConfig.java | 42 ++++++++----------- .../io/confluent/ksql/util/KsqlConstants.java | 38 +++++++++++++++++ .../io/confluent/ksql/analyzer/Analyzer.java | 25 +++++------ .../ksql/planner/LogicalPlanner.java | 6 +-- .../ksql/rest/server/KsqlRestApplication.java | 5 ++- 5 files changed, 74 insertions(+), 42 deletions(-) create mode 100644 ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index e990e27f6655..f71aefc7175a 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -32,16 +32,12 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String KSQL_CONFIG_PREPERTY_PREFIX = "ksql."; public static final String KSQL_TIMESTAMP_COLUMN_INDEX = "ksq.timestamp.column.index"; - public static final String SINK_TIMESTAMP_COLUMN_NAME = "TIMESTAMP"; - public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS"; public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; - public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS"; public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas"; - public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = - "WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION"; + public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY = "ksql.sink.window.change.log.additional.retention"; @@ -71,15 +67,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore"; - private static int defaultSinkNumberOfPartitions = 4; - private static short defaultSinkNumberOfReplications = 1; - // TODO: Find out the best default value. - private static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; - private static String defaultAutoOffsetRestConfig = "latest"; - private static long defaultCommitIntervalMsConfig = 2000; - private static long defaultCacheMaxBytesBufferingConfig = 10000000; - private static int defaultNumberOfStreamsThreads = 4; Map ksqlConfigProps; Map ksqlStreamConfigProps; @@ -106,7 +94,10 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.MEDIUM, "Second part of the prefix for transient queries. For instance if the prefix is " - + "transient_ the query name would be ksql_transient_4120896722607083946_1509389010601") + + "transient_ the query name would be " + + "ksql_transient_4120896722607083946_1509389010601 where 'ksql_' is the first prefix" + + " and '_transient' is the second part of the prefix for the query id the third and " + + "4th parts are a random long value and the current timestamp. ") .define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, ConfigDef.Type.STRING, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, @@ -116,25 +107,22 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { + "_ksql_statestore ") .define(SINK_NUMBER_OF_PARTITIONS_PROPERTY, ConfigDef.Type.INT, - defaultSinkNumberOfPartitions, + KsqlConstants.defaultSinkNumberOfPartitions, ConfigDef.Importance.MEDIUM, "The default number of partitions for the topics created by KSQL.") .define(SINK_NUMBER_OF_REPLICAS_PROPERTY, ConfigDef.Type.SHORT, - defaultSinkNumberOfReplications, + KsqlConstants.defaultSinkNumberOfReplications, ConfigDef.Importance.MEDIUM, "The default number of replicas for the topics created by KSQL." ) .define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, ConfigDef.Type.LONG, - defaultSinkWindowChangeLogAdditionalRetention, + KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention, ConfigDef.Importance.MEDIUM, "The default window change log additional retention time. This is a streams " + "config value which will be added to a windows maintainMs to ensure data is not " - + "deleted from " - + "the " - + "log " - + "prematurely. Allows for clock drift. Default is 1 day" + + "deleted from the log prematurely. Allows for clock drift. Default is 1 day" ) ; } @@ -147,11 +135,15 @@ public KsqlConfig(Map props) { ksqlStreamConfigProps = new HashMap<>(); ksqlConfigProps.putAll(super.values()); - ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig); - ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig); + ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KsqlConstants + .defaultAutoOffsetRestConfig); + ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KsqlConstants + .defaultCommitIntervalMsConfig); ksqlStreamConfigProps.put( - StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig); - ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads); + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, KsqlConstants + .defaultCacheMaxBytesBufferingConfig); + ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, KsqlConstants + .defaultNumberOfStreamsThreads); for (Map.Entry entry : originals().entrySet()) { final String key = entry.getKey().toString(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java new file mode 100644 index 000000000000..ac932f989e8a --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java @@ -0,0 +1,38 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.ksql.util; + +public class KsqlConstants { + + public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS"; + public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS"; + + public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = + "WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION"; + public static final String SINK_TIMESTAMP_COLUMN_NAME = "TIMESTAMP"; + + public static int defaultSinkNumberOfPartitions = 4; + public static short defaultSinkNumberOfReplications = 1; + // TODO: Find out the best default value. + public static long defaultSinkWindowChangeLogAdditionalRetention = 1000000; + + public static String defaultAutoOffsetRestConfig = "latest"; + public static long defaultCommitIntervalMsConfig = 2000; + public static long defaultCacheMaxBytesBufferingConfig = 10000000; + public static int defaultNumberOfStreamsThreads = 4; + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 55bbc7c09d1f..d380b45abe3f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -51,6 +51,7 @@ import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe; import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import org.apache.kafka.connect.data.Field; @@ -447,35 +448,35 @@ private void setIntoProperties(final StructuredDataSource into, final Table node intoPartitionByColumnName); } - if (node.getProperties().get(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME) != null) { + if (node.getProperties().get(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME) != null) { setIntoTimestampColumn(node); } - if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS) != null) { + if (node.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) != null) { try { int numberOfPartitions = Integer.parseInt(node.getProperties() - .get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS) + .get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) .toString()); analysis.getIntoProperties().put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, numberOfPartitions); } catch (NumberFormatException e) { throw new KsqlException("Invalid number of partitions in WITH clause: " - + node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS) + + node.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) .toString()); } } - if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) != null) { + if (node.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS) != null) { try { short numberOfReplications = - Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) + Short.parseShort(node.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS) .toString()); analysis.getIntoProperties() .put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications); } catch (NumberFormatException e) { throw new KsqlException("Invalid number of replications in WITH clause: " + node - .getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString()); + .getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString()); } } } @@ -520,7 +521,7 @@ private void setIntoTopicFormat(final StructuredDataSource into, final Table nod private void setIntoTimestampColumn(final Table node) { String intoTimestampColumnName = node.getProperties() - .get(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME).toString().toUpperCase(); + .get(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME).toString().toUpperCase(); if (!intoTimestampColumnName.startsWith("'") && !intoTimestampColumnName.endsWith("'")) { throw new KsqlException( intoTimestampColumnName + " value is string and should be enclosed between " @@ -529,7 +530,7 @@ private void setIntoTimestampColumn(final Table node) { intoTimestampColumnName = intoTimestampColumnName.substring(1, intoTimestampColumnName .length() - 1); - analysis.getIntoProperties().put(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME, + analysis.getIntoProperties().put(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME, intoTimestampColumnName); } @@ -539,9 +540,9 @@ private void validateWithClause(Set withClauseVariables) { validSet.add(DdlConfig.VALUE_FORMAT_PROPERTY.toUpperCase()); validSet.add(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY.toUpperCase()); validSet.add(DdlConfig.PARTITION_BY_PROPERTY.toUpperCase()); - validSet.add(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME.toUpperCase()); - validSet.add(KsqlConfig.SINK_NUMBER_OF_PARTITIONS.toUpperCase()); - validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICAS.toUpperCase()); + validSet.add(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME.toUpperCase()); + validSet.add(KsqlConstants.SINK_NUMBER_OF_PARTITIONS.toUpperCase()); + validSet.add(KsqlConstants.SINK_NUMBER_OF_REPLICAS.toUpperCase()); for (String withVariable: withClauseVariables) { if (!validSet.contains(withVariable.toUpperCase())) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index a3c002f3255f..34c9440863a8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -34,7 +34,7 @@ import io.confluent.ksql.planner.plan.ProjectNode; import io.confluent.ksql.planner.plan.StructuredDataSourceNode; import io.confluent.ksql.util.ExpressionTypeManager; -import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.SchemaUtil; import org.apache.kafka.connect.data.Field; @@ -82,11 +82,11 @@ private OutputNode buildOutputNode(final Schema inputSchema, final PlanNode sour inputSchema, analysis.getLimitClause()); } else if (intoDataSource != null) { Field timestampField = null; - if (analysis.getIntoProperties().get(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME) != null) { + if (analysis.getIntoProperties().get(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME) != null) { timestampField = SchemaUtil.getFieldByName(inputSchema, analysis.getIntoProperties() - .get(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME) + .get(KsqlConstants.SINK_TIMESTAMP_COLUMN_NAME) .toString()).get(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index eec86ce2c495..3b91c2249ac3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -48,6 +48,7 @@ import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KafkaTopicClientImpl; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.Version; import io.confluent.rest.Application; import io.confluent.rest.validation.JacksonMessageBodyProvider; @@ -218,9 +219,9 @@ public static KsqlRestApplication buildApplication(KsqlRestConfig restConfig, bo try { short replicationFactor = 1; - if(restConfig.getOriginals().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS)) { + if(restConfig.getOriginals().containsKey(KsqlConstants.SINK_NUMBER_OF_REPLICAS)) { replicationFactor = Short.parseShort(restConfig.getOriginals() - .get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString()); + .get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString()); } client.createTopic(commandTopic, 1, replicationFactor); } catch (KafkaTopicException e) { From ae5689a448acc49e14ecdc8ad97a39cef40a76df Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Tue, 7 Nov 2017 11:15:22 -0800 Subject: [PATCH 7/7] Minor config doc fix. --- .../src/main/java/io/confluent/ksql/util/KsqlConfig.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index f71aefc7175a..fa1777544fb9 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -87,7 +87,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Type.STRING, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, ConfigDef.Importance.MEDIUM, - "Second part of the prefix for persitent queries. For instance if the prefix is transient_" + "Second part of the prefix for persitent queries. For instance if the prefix is " + "query_ the query name will be ksql_query_1.") .define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, @@ -103,8 +103,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, ConfigDef.Importance.MEDIUM, "Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state " - + "store name would be ksql_query_1_ksql_statestore" - + "_ksql_statestore ") + + "store name would be ksql_query_1_ksql_statestore _ksql_statestore ") .define(SINK_NUMBER_OF_PARTITIONS_PROPERTY, ConfigDef.Type.INT, KsqlConstants.defaultSinkNumberOfPartitions,