diff --git a/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java b/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java index 43123b0cbf78..678aa18d151d 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java +++ b/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java @@ -216,8 +216,7 @@ public void buildQueryPhysicalPlan(final List physicalPlans, KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone(); // Build a physical plan, in this case a Kafka Streams DSL - PhysicalPlanBuilder physicalPlanBuilder = - new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient()); + PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient()); SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan); OutputNode outputNode = physicalPlanBuilder.getPlanSink(); @@ -234,94 +233,129 @@ public void buildQueryPhysicalPlan(final List physicalPlans, schemaKStream.getClass().getCanonicalName() )); } - String serviceId = ksqlEngine.getKsqlConfig() - .get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString(); - String persistanceQueryPrefix = ksqlEngine.getKsqlConfig() - .get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString(); - String transientQueryPrefix = ksqlEngine.getKsqlConfig() - .get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString(); - if (isBareQuery) { - String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix); - if (addUniqueTimeSuffix) { - applicationId = addTimeSuffix(applicationId); - } - - KafkaStreams streams = - buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties); - - QueuedSchemaKStream queuedSchemaKStream = (QueuedSchemaKStream) schemaKStream; - KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) outputNode; + String serviceId = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString(); + String persistanceQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString(); + String transientQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString(); - SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0); + if (isBareQuery) { - physicalPlans.add(new QueuedQueryMetadata( - statementPlanPair.getLeft(), - streams, - ksqlBareOutputNode, - schemaKStream.getExecutionPlan(""), - queuedSchemaKStream.getQueue(), - (sourceSchemaKstream instanceof SchemaKTable)? - DataSource.DataSourceType.KTABLE: DataSource.DataSourceType.KSTREAM - )); + physicalPlans.add(buildPlanForBareQuery(addUniqueTimeSuffix, statementPlanPair, overriddenStreamsProperties, + builder, ksqlConfigClone, (QueuedSchemaKStream) schemaKStream, (KsqlBareOutputNode) outputNode, + serviceId, transientQueryPrefix)); } else if (outputNode instanceof KsqlStructuredDataOutputNode) { - long queryId = getNextQueryId(); - String applicationId = serviceId + persistanceQueryPrefix + - queryId; - if (addUniqueTimeSuffix) { - applicationId = addTimeSuffix(applicationId); - } + physicalPlans.add(buildPlanForStructuredOutputNode(addUniqueTimeSuffix, statementPlanPair, + overriddenStreamsProperties, updateMetastore, builder, ksqlConfigClone, schemaKStream, + (KsqlStructuredDataOutputNode) outputNode, serviceId, persistanceQueryPrefix)); - KafkaStreams streams = - buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties); - - KsqlStructuredDataOutputNode kafkaTopicOutputNode = - (KsqlStructuredDataOutputNode) outputNode; - physicalPlans.add( - new PersistentQueryMetadata(statementPlanPair.getLeft(), - streams, kafkaTopicOutputNode, schemaKStream - .getExecutionPlan(""), queryId, - (schemaKStream instanceof SchemaKTable)? DataSource - .DataSourceType.KTABLE: DataSource.DataSourceType.KSTREAM) - ); - - MetaStore metaStore = ksqlEngine.getMetaStore(); - if (metaStore.getTopic(kafkaTopicOutputNode.getKafkaTopicName()) == null) { - metaStore.putTopic(kafkaTopicOutputNode.getKsqlTopic()); - } - StructuredDataSource sinkDataSource; - if (schemaKStream instanceof SchemaKTable) { - SchemaKTable schemaKTable = (SchemaKTable) schemaKStream; - sinkDataSource = - new KsqlTable(kafkaTopicOutputNode.getId().toString(), - kafkaTopicOutputNode.getSchema(), - schemaKStream.getKeyField(), - kafkaTopicOutputNode.getTimestampField(), - kafkaTopicOutputNode.getKsqlTopic(), - kafkaTopicOutputNode.getId().toString() + - ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG), - schemaKTable.isWindowed()); - } else { - sinkDataSource = - new KsqlStream(kafkaTopicOutputNode.getId().toString(), - kafkaTopicOutputNode.getSchema(), - schemaKStream.getKeyField(), - kafkaTopicOutputNode.getTimestampField(), - kafkaTopicOutputNode.getKsqlTopic()); - } - - if (updateMetastore) { - metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns()); - } } else { throw new KsqlException("Sink data source is not correct."); } + log.info("Build physical plan for {}.", statementPlanPair.getLeft()); log.info(" Execution plan: \n"); log.info(schemaKStream.getExecutionPlan("")); } + /** + * + * @param addUniqueTimeSuffix + * @param statementPlanPair + * @param overriddenStreamsProperties + * @param builder + * @param ksqlConfigClone + * @param bareOutputNode + * @param serviceId + * @param transientQueryPrefix + */ + private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix, + Pair statementPlanPair, Map overriddenStreamsProperties, + KStreamBuilder builder, KsqlConfig ksqlConfigClone, QueuedSchemaKStream schemaKStream, + KsqlBareOutputNode bareOutputNode, String serviceId, String transientQueryPrefix) { + + String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix); + if (addUniqueTimeSuffix) { + applicationId = addTimeSuffix(applicationId); + } + + KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties); + + SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0); + + return new QueuedQueryMetadata( + statementPlanPair.getLeft(), + streams, + bareOutputNode, + schemaKStream.getExecutionPlan(""), + schemaKStream.getQueue(), + (sourceSchemaKstream instanceof SchemaKTable) ? + DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM + ); + } + + /** + * + * @param addUniqueTimeSuffix + * @param statementPlanPair + * @param overriddenStreamsProperties + * @param updateMetastore + * @param builder + * @param ksqlConfigClone + * @param schemaKStream + * @param serviceId + * @param persistanceQueryPrefix + */ + private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuffix, + Pair statementPlanPair, Map overriddenStreamsProperties, + boolean updateMetastore, KStreamBuilder builder, KsqlConfig ksqlConfigClone, SchemaKStream schemaKStream, + KsqlStructuredDataOutputNode outputNode, String serviceId, String persistanceQueryPrefix) { + + long queryId = getNextQueryId(); + + String applicationId = serviceId + persistanceQueryPrefix + queryId; + if (addUniqueTimeSuffix) { + applicationId = addTimeSuffix(applicationId); + } + + MetaStore metaStore = ksqlEngine.getMetaStore(); + if (metaStore.getTopic(outputNode.getKafkaTopicName()) == null) { + metaStore.putTopic(outputNode.getKsqlTopic()); + } + StructuredDataSource sinkDataSource; + if (schemaKStream instanceof SchemaKTable) { + SchemaKTable schemaKTable = (SchemaKTable) schemaKStream; + sinkDataSource = + new KsqlTable(outputNode.getId().toString(), + outputNode.getSchema(), + schemaKStream.getKeyField(), + outputNode.getTimestampField(), + outputNode.getKsqlTopic(), + outputNode.getId().toString() + + ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG), + schemaKTable.isWindowed()); + } else { + sinkDataSource = + new KsqlStream(outputNode.getId().toString(), + outputNode.getSchema(), + schemaKStream.getKeyField(), + outputNode.getTimestampField(), + outputNode.getKsqlTopic()); + } + + if (updateMetastore) { + metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns()); + } + KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties); + + return new PersistentQueryMetadata(statementPlanPair.getLeft(), + streams, outputNode, schemaKStream + .getExecutionPlan(""), queryId, + (schemaKStream instanceof SchemaKTable) ? DataSource + .DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM); + } + + public DDLCommandResult handleDdlStatement( final Statement statement, final Map overriddenProperties) { diff --git a/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index 4c2a4fcd5d3c..c2f812b18fd4 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -230,11 +230,10 @@ private SchemaKStream buildOutput(final OutputNode outputNode, Map() { + KStream rekeyedKStream = sourceSchemaKStream.getKstream().selectKey(new KeyValueMapper() { @Override public String apply(String key, GenericRow value) { diff --git a/ksql-core/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java b/ksql-core/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java index 3ca8e383d19e..9995d7d07b47 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java +++ b/ksql-core/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java @@ -21,40 +21,46 @@ import io.confluent.ksql.serde.KsqlTopicSerDe; import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Windowed; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.SynchronousQueue; public class QueuedSchemaKStream extends SchemaKStream { - private final SynchronousQueue> rowQueue; + private final SynchronousQueue> rowQueue = new SynchronousQueue<>(); public QueuedSchemaKStream(final Schema schema, final KStream kstream, final Field keyField, final List sourceSchemaKStreams, - SynchronousQueue> rowQueue, - Type type) { + Type type, + Optional limit + ) { super(schema, kstream, keyField, sourceSchemaKStreams, type); - this.rowQueue = rowQueue; + kstream.foreach(new QueuedSchemaKStream.QueuePopulator(rowQueue, limit)); } public QueuedSchemaKStream(SchemaKStream schemaKStream, - SynchronousQueue> rowQueue, - Type type) { + Optional limit + ) { this( - schemaKStream.schema, - schemaKStream.kstream, - schemaKStream.keyField, - schemaKStream.sourceSchemaKStreams, - rowQueue, - type + schemaKStream.schema, + schemaKStream.getKstream(), + schemaKStream.keyField, + schemaKStream.sourceSchemaKStreams, + Type.SINK, + limit ); } @@ -123,4 +129,41 @@ public KStream getKstream() { public List getSourceSchemaKStreams() { return super.getSourceSchemaKStreams(); } + + protected static class QueuePopulator implements ForeachAction { + private final SynchronousQueue> queue; + private final Optional limit; + private int counter = 0; + + public QueuePopulator(SynchronousQueue> queue, + Optional limit) { + this.queue = queue; + this.limit = limit; + } + + @Override + public void apply(K key, GenericRow row) { + try { + if (row == null) { + return; + } + if (limit.isPresent()) { + counter++; + if (counter > limit.get()) { + throw new KsqlException("LIMIT reached for the partition."); + } + } + String keyString; + if (key instanceof Windowed) { + Windowed windowedKey = (Windowed) key; + keyString = String.format("%s : %s", windowedKey.key(), windowedKey.window()); + } else { + keyString = Objects.toString(key); + } + queue.put(new KeyValue<>(keyString, row)); + } catch (InterruptedException exception) { + throw new KsqlException("InterruptedException while enqueueing:" + key); + } + } + } } diff --git a/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 8598b0001fb1..810c0515d848 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -78,9 +78,7 @@ public SchemaKStream(final Schema schema, final KStream kstream, final Field key } public QueuedSchemaKStream toQueue(Optional limit) { - SynchronousQueue> rowQueue = new SynchronousQueue<>(); - kstream.foreach(new QueuePopulator(rowQueue, limit)); - return new QueuedSchemaKStream(this, rowQueue, Type.SINK); + return new QueuedSchemaKStream(this, limit); } public SchemaKStream into(final String kafkaTopicName, final Serde topicValueSerDe, @@ -281,44 +279,6 @@ public List getSourceSchemaKStreams() { return sourceSchemaKStreams; } - protected static class QueuePopulator implements ForeachAction { - private final SynchronousQueue> queue; - private final Optional limit; - private int counter = 0; - - public QueuePopulator(SynchronousQueue> queue, - Optional limit) { - this.queue = queue; - this.limit = limit; - } - - @Override - public void apply(K key, GenericRow row) { - try { - if (row == null) { - return; - } - if (limit.isPresent()) { - counter ++; - if (counter > limit.get()) { - throw new KsqlException("LIMIT reached for the partition."); - } - } - String keyString; - if (key instanceof Windowed) { - Windowed windowedKey = (Windowed) key; - keyString = String.format("%s : %s", windowedKey.key(), windowedKey.window()); - } else { - keyString = Objects.toString(key); - } - queue.put(new KeyValue<>(keyString, row)); - } catch (InterruptedException exception) { - log.error(" Exception while enqueuing the row: " + key + " : " + row); - log.error(" Exception: " + exception.getMessage()); - } - } - } - public String getExecutionPlan(String indent) { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(indent + " > [ " + type + " ] Schema: " + SchemaUtil diff --git a/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 2d539da256e6..c62d77c1698d 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-core/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -111,9 +111,7 @@ public KeyValue apply(String key, GenericRow row) { @Override public QueuedSchemaKStream toQueue(Optional limit) { - SynchronousQueue> rowQueue = new SynchronousQueue<>(); - ktable.toStream().foreach(new QueuePopulator(rowQueue, limit)); - return new QueuedSchemaKStream(this, rowQueue, Type.SINK); + return new QueuedSchemaKStream(this, limit); } @Override @@ -125,8 +123,7 @@ public SchemaKTable filter(final Expression filterExpression) throws Exception { } @Override - public SchemaKTable select(final List> expressionPairList) - throws Exception { + public SchemaKTable select(final List> expressionPairList) throws Exception { CodeGenRunner codeGenRunner = new CodeGenRunner(); // TODO: Optimize to remove the code gen for constants and single // TODO: columns references and use them directly.