Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted Pipeline into QueuePopulatorSchema #330

Merged
merged 1 commit into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 110 additions & 76 deletions ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
PhysicalPlanBuilder physicalPlanBuilder =
new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient());
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient());
SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan);

OutputNode outputNode = physicalPlanBuilder.getPlanSink();
Expand All @@ -234,94 +233,129 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
schemaKStream.getClass().getCanonicalName()
));
}
String serviceId = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
String persistanceQueryPrefix = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
String transientQueryPrefix = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString();
if (isBareQuery) {
String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix);
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}

KafkaStreams streams =
buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

QueuedSchemaKStream queuedSchemaKStream = (QueuedSchemaKStream) schemaKStream;
KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) outputNode;
String serviceId = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
String persistanceQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
String transientQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString();

SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);
if (isBareQuery) {

physicalPlans.add(new QueuedQueryMetadata(
statementPlanPair.getLeft(),
streams,
ksqlBareOutputNode,
schemaKStream.getExecutionPlan(""),
queuedSchemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable)?
DataSource.DataSourceType.KTABLE: DataSource.DataSourceType.KSTREAM
));
physicalPlans.add(buildPlanForBareQuery(addUniqueTimeSuffix, statementPlanPair, overriddenStreamsProperties,
builder, ksqlConfigClone, (QueuedSchemaKStream) schemaKStream, (KsqlBareOutputNode) outputNode,
serviceId, transientQueryPrefix));

} else if (outputNode instanceof KsqlStructuredDataOutputNode) {
long queryId = getNextQueryId();

String applicationId = serviceId + persistanceQueryPrefix +
queryId;
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}
physicalPlans.add(buildPlanForStructuredOutputNode(addUniqueTimeSuffix, statementPlanPair,
overriddenStreamsProperties, updateMetastore, builder, ksqlConfigClone, schemaKStream,
(KsqlStructuredDataOutputNode) outputNode, serviceId, persistanceQueryPrefix));

KafkaStreams streams =
buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

KsqlStructuredDataOutputNode kafkaTopicOutputNode =
(KsqlStructuredDataOutputNode) outputNode;
physicalPlans.add(
new PersistentQueryMetadata(statementPlanPair.getLeft(),
streams, kafkaTopicOutputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable)? DataSource
.DataSourceType.KTABLE: DataSource.DataSourceType.KSTREAM)
);

MetaStore metaStore = ksqlEngine.getMetaStore();
if (metaStore.getTopic(kafkaTopicOutputNode.getKafkaTopicName()) == null) {
metaStore.putTopic(kafkaTopicOutputNode.getKsqlTopic());
}
StructuredDataSource sinkDataSource;
if (schemaKStream instanceof SchemaKTable) {
SchemaKTable schemaKTable = (SchemaKTable) schemaKStream;
sinkDataSource =
new KsqlTable(kafkaTopicOutputNode.getId().toString(),
kafkaTopicOutputNode.getSchema(),
schemaKStream.getKeyField(),
kafkaTopicOutputNode.getTimestampField(),
kafkaTopicOutputNode.getKsqlTopic(),
kafkaTopicOutputNode.getId().toString() +
ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG),
schemaKTable.isWindowed());
} else {
sinkDataSource =
new KsqlStream(kafkaTopicOutputNode.getId().toString(),
kafkaTopicOutputNode.getSchema(),
schemaKStream.getKeyField(),
kafkaTopicOutputNode.getTimestampField(),
kafkaTopicOutputNode.getKsqlTopic());
}

if (updateMetastore) {
metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns());
}
} else {
throw new KsqlException("Sink data source is not correct.");
}

log.info("Build physical plan for {}.", statementPlanPair.getLeft());
log.info(" Execution plan: \n");
log.info(schemaKStream.getExecutionPlan(""));
}

/**
*
* @param addUniqueTimeSuffix
* @param statementPlanPair
* @param overriddenStreamsProperties
* @param builder
* @param ksqlConfigClone
* @param bareOutputNode
* @param serviceId
* @param transientQueryPrefix
*/
private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
KStreamBuilder builder, KsqlConfig ksqlConfigClone, QueuedSchemaKStream schemaKStream,
KsqlBareOutputNode bareOutputNode, String serviceId, String transientQueryPrefix) {

String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix);
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}

KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
statementPlanPair.getLeft(),
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable) ?
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM
);
}

/**
*
* @param addUniqueTimeSuffix
* @param statementPlanPair
* @param overriddenStreamsProperties
* @param updateMetastore
* @param builder
* @param ksqlConfigClone
* @param schemaKStream
* @param serviceId
* @param persistanceQueryPrefix
*/
private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
boolean updateMetastore, KStreamBuilder builder, KsqlConfig ksqlConfigClone, SchemaKStream schemaKStream,
KsqlStructuredDataOutputNode outputNode, String serviceId, String persistanceQueryPrefix) {

long queryId = getNextQueryId();

String applicationId = serviceId + persistanceQueryPrefix + queryId;
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}

MetaStore metaStore = ksqlEngine.getMetaStore();
if (metaStore.getTopic(outputNode.getKafkaTopicName()) == null) {
metaStore.putTopic(outputNode.getKsqlTopic());
}
StructuredDataSource sinkDataSource;
if (schemaKStream instanceof SchemaKTable) {
SchemaKTable schemaKTable = (SchemaKTable) schemaKStream;
sinkDataSource =
new KsqlTable(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic(),
outputNode.getId().toString() +
ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG),
schemaKTable.isWindowed());
} else {
sinkDataSource =
new KsqlStream(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic());
}

if (updateMetastore) {
metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns());
}
KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

return new PersistentQueryMetadata(statementPlanPair.getLeft(),
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM);
}


public DDLCommandResult handleDdlStatement(
final Statement statement,
final Map<String, Object> overriddenProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,10 @@ private SchemaKStream buildOutput(final OutputNode outputNode, Map<String, Objec
this.planSink = ksqlStructuredDataOutputNodeWithRowkey;
return resultSchemaStream;
} else if (outputNode instanceof KsqlBareOutputNode) {
SchemaKStream resultSchemaStream = schemaKStream.toQueue(outputNode.getLimit());
KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) outputNode;
this.planSink = ksqlBareOutputNode;
return resultSchemaStream;
this.planSink = outputNode;
return schemaKStream.toQueue(outputNode.getLimit());
}

throw new KsqlException("Unsupported output logical node: " + outputNode.getClass().getName());
}

Expand Down Expand Up @@ -585,8 +584,7 @@ private SchemaKStream aggregateReKey(final AggregateNode aggregateNode,
newKeyIndexes.add(getIndexInSchema(groupByExpr.toString(), sourceSchemaKStream.getSchema()));
}

KStream rekeyedKStream = sourceSchemaKStream.getKstream().selectKey(new KeyValueMapper<String,
GenericRow, String>() {
KStream rekeyedKStream = sourceSchemaKStream.getKstream().selectKey(new KeyValueMapper<String, GenericRow, String>() {

@Override
public String apply(String key, GenericRow value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,46 @@
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;

public class QueuedSchemaKStream extends SchemaKStream {

private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue;
private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue = new SynchronousQueue<>();

public QueuedSchemaKStream(final Schema schema, final KStream kstream, final Field keyField,
final List<SchemaKStream> sourceSchemaKStreams,
SynchronousQueue<KeyValue<String, GenericRow>> rowQueue,
Type type) {
Type type,
Optional<Integer> limit
) {
super(schema, kstream, keyField, sourceSchemaKStreams, type);
this.rowQueue = rowQueue;
kstream.foreach(new QueuedSchemaKStream.QueuePopulator(rowQueue, limit));
}

public QueuedSchemaKStream(SchemaKStream schemaKStream,
SynchronousQueue<KeyValue<String, GenericRow>> rowQueue,
Type type) {
Optional<Integer> limit
) {
this(
schemaKStream.schema,
schemaKStream.kstream,
schemaKStream.keyField,
schemaKStream.sourceSchemaKStreams,
rowQueue,
type
schemaKStream.schema,
schemaKStream.getKstream(),
schemaKStream.keyField,
schemaKStream.sourceSchemaKStreams,
Type.SINK,
limit
);
}

Expand Down Expand Up @@ -123,4 +129,41 @@ public KStream getKstream() {
public List<SchemaKStream> getSourceSchemaKStreams() {
return super.getSourceSchemaKStreams();
}

protected static class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
private final SynchronousQueue<KeyValue<String, GenericRow>> queue;
private final Optional<Integer> limit;
private int counter = 0;

public QueuePopulator(SynchronousQueue<KeyValue<String, GenericRow>> queue,
Optional<Integer> limit) {
this.queue = queue;
this.limit = limit;
}

@Override
public void apply(K key, GenericRow row) {
try {
if (row == null) {
return;
}
if (limit.isPresent()) {
counter++;
if (counter > limit.get()) {
throw new KsqlException("LIMIT reached for the partition.");
}
}
String keyString;
if (key instanceof Windowed) {
Windowed windowedKey = (Windowed) key;
keyString = String.format("%s : %s", windowedKey.key(), windowedKey.window());
} else {
keyString = Objects.toString(key);
}
queue.put(new KeyValue<>(keyString, row));
} catch (InterruptedException exception) {
throw new KsqlException("InterruptedException while enqueueing:" + key);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ public SchemaKStream(final Schema schema, final KStream kstream, final Field key
}

public QueuedSchemaKStream toQueue(Optional<Integer> limit) {
SynchronousQueue<KeyValue<String, GenericRow>> rowQueue = new SynchronousQueue<>();
kstream.foreach(new QueuePopulator(rowQueue, limit));
return new QueuedSchemaKStream(this, rowQueue, Type.SINK);
return new QueuedSchemaKStream(this, limit);
}

public SchemaKStream into(final String kafkaTopicName, final Serde<GenericRow> topicValueSerDe,
Expand Down Expand Up @@ -281,44 +279,6 @@ public List<SchemaKStream> getSourceSchemaKStreams() {
return sourceSchemaKStreams;
}

protected static class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
private final SynchronousQueue<KeyValue<String, GenericRow>> queue;
private final Optional<Integer> limit;
private int counter = 0;

public QueuePopulator(SynchronousQueue<KeyValue<String, GenericRow>> queue,
Optional<Integer> limit) {
this.queue = queue;
this.limit = limit;
}

@Override
public void apply(K key, GenericRow row) {
try {
if (row == null) {
return;
}
if (limit.isPresent()) {
counter ++;
if (counter > limit.get()) {
throw new KsqlException("LIMIT reached for the partition.");
}
}
String keyString;
if (key instanceof Windowed) {
Windowed windowedKey = (Windowed) key;
keyString = String.format("%s : %s", windowedKey.key(), windowedKey.window());
} else {
keyString = Objects.toString(key);
}
queue.put(new KeyValue<>(keyString, row));
} catch (InterruptedException exception) {
log.error(" Exception while enqueuing the row: " + key + " : " + row);
log.error(" Exception: " + exception.getMessage());
}
}
}

public String getExecutionPlan(String indent) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(indent + " > [ " + type + " ] Schema: " + SchemaUtil
Expand Down
Loading