Skip to content

Commit

Permalink
Remove SourceNode as it is redundant (#368)
Browse files Browse the repository at this point in the history
* remove SourceNode as it is redundant

* Add test for KsqlTable datasource in LogicalPlanner
  • Loading branch information
dguy authored Oct 16, 2017
1 parent 32c20ae commit d18a471
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 314 deletions.
4 changes: 2 additions & 2 deletions docs/quickstart/quickstart-non-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ Minimally, to use the [quick start exercises](/docs/quickstart#quick-start), you
1. Produce Kafka data to the `pageviews` topic using the data generator. The following example continuously generates data with a value in DELIMITED format.

```bash
$ java -jar ksql-examples/target/ksql-examples-0.1-SNAPSHOT-standalone.jar \
$ java -jar ksql-examples/target/ksql-examples-4.0.0-SNAPSHOT-standalone.jar \
quickstart=pageviews format=delimited topic=pageviews maxInterval=10000
```

1. Produce Kafka data to the `users` topic using the data generator. The following example continuously generates data with a value in JSON format.

```bash
$ java -jar ksql-examples/target/ksql-examples-0.1-SNAPSHOT-standalone.jar \
$ java -jar ksql-examples/target/ksql-examples-4.0.0-SNAPSHOT-standalone.jar \
quickstart=users format=json topic=users maxInterval=10000
```

Expand Down
4 changes: 4 additions & 0 deletions ksql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,9 @@ public Optional<Integer> getLimitClause() {
public void setLimitClause(Optional<Integer> limitClause) {
this.limitClause = limitClause;
}

public Pair<StructuredDataSource, String> getFromDataSource(int index) {
return fromDataSources.get(index);
}
}

17 changes: 5 additions & 12 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,14 @@ protected Node visitJoin(final Join node, final AnalysisContext context) {
String rightAlias = right.getAlias();
StructuredDataSourceNode
leftSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"), leftDataSource.getSchema(),
leftDataSource.getKeyField(),
leftDataSource.getTimestampField(),
leftDataSource.getKsqlTopic().getTopicName(),
leftAlias, leftDataSource.getDataSourceType(),
leftDataSource);
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"),
leftDataSource,
leftDataSource.getSchema());
StructuredDataSourceNode
rightSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Right"),
rightDataSource.getSchema(),
rightDataSource.getKeyField(),
rightDataSource.getTimestampField(),
rightDataSource.getKsqlTopic().getTopicName(),
rightAlias, rightDataSource.getDataSourceType(),
rightDataSource);
rightDataSource,
rightDataSource.getSchema());

JoinNode.Type joinType;
switch (node.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.planner.plan.SourceNode;
import io.confluent.ksql.planner.plan.StructuredDataSourceNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
Expand Down Expand Up @@ -124,8 +123,8 @@ private SchemaKStream kafkaStreamsDsl(final PlanNode planNode) throws Exception

private SchemaKStream kafkaStreamsDsl(final PlanNode planNode, Map<String, Object> propsMap) throws
Exception {
if (planNode instanceof SourceNode) {
return buildSource((SourceNode) planNode, propsMap);
if (planNode instanceof StructuredDataSourceNode) {
return buildSource((StructuredDataSourceNode) planNode, propsMap);
} else if (planNode instanceof JoinNode) {
return buildJoin((JoinNode) planNode, propsMap);
} else if (planNode instanceof AggregateNode) {
Expand Down Expand Up @@ -388,53 +387,47 @@ private SchemaKStream buildFilter(final FilterNode filterNode)
}


private SchemaKStream buildSource(final SourceNode sourceNode, Map<String, Object> props) {

if (sourceNode instanceof StructuredDataSourceNode) {
StructuredDataSourceNode structuredDataSourceNode = (StructuredDataSourceNode) sourceNode;

if (structuredDataSourceNode.getTimestampField() != null) {
int timestampColumnIndex = getTimeStampColumnIndex(structuredDataSourceNode
.getSchema(),
structuredDataSourceNode
.getTimestampField());
ksqlConfig.put(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX, timestampColumnIndex);
}

Serde<GenericRow>
genericRowSerde =
SerDeUtil.getRowSerDe(structuredDataSourceNode.getStructuredDataSource()
.getKsqlTopic().getKsqlTopicSerDe(),
SchemaUtil.removeImplicitRowTimeRowKeyFromSchema(
structuredDataSourceNode.getSchema()));

if (structuredDataSourceNode.getDataSourceType()
== StructuredDataSource.DataSourceType.KTABLE) {
final KsqlTable table = (KsqlTable) structuredDataSourceNode.getStructuredDataSource();

final KTable kTable = createKTable(
getAutoOffsetReset(props),
table,
genericRowSerde,
SerDeUtil.getRowSerDe(table.getKsqlTopic().getKsqlTopicSerDe(),
structuredDataSourceNode.getSchema())
);
return new SchemaKTable(sourceNode.getSchema(), kTable,
sourceNode.getKeyField(), new ArrayList<>(),
table.isWindowed(),
SchemaKStream.Type.SOURCE);
}
private SchemaKStream buildSource(final StructuredDataSourceNode sourceNode, Map<String, Object> props) {
if (sourceNode.getTimestampField() != null) {
int timestampColumnIndex = getTimeStampColumnIndex(sourceNode
.getSchema(),
sourceNode
.getTimestampField());
ksqlConfig.put(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX, timestampColumnIndex);
}

return new SchemaKStream(sourceNode.getSchema(),
builder
.stream(structuredDataSourceNode.getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde))
.map(nonWindowedMapper)
.transformValues(new AddTimestampColumnValueTransformerSupplier()),
Serde<GenericRow>
genericRowSerde =
SerDeUtil.getRowSerDe(sourceNode.getStructuredDataSource()
.getKsqlTopic().getKsqlTopicSerDe(),
SchemaUtil.removeImplicitRowTimeRowKeyFromSchema(
sourceNode.getSchema()));

if (sourceNode.getDataSourceType()
== StructuredDataSource.DataSourceType.KTABLE) {
final KsqlTable table = (KsqlTable) sourceNode.getStructuredDataSource();

final KTable kTable = createKTable(
getAutoOffsetReset(props),
table,
genericRowSerde,
SerDeUtil.getRowSerDe(table.getKsqlTopic().getKsqlTopicSerDe(),
sourceNode.getSchema())
);
return new SchemaKTable(sourceNode.getSchema(), kTable,
sourceNode.getKeyField(), new ArrayList<>(),
table.isWindowed(),
SchemaKStream.Type.SOURCE);
}
throw new KsqlException("Unsupported source logical node: " + sourceNode.getClass().getName());

return new SchemaKStream(sourceNode.getSchema(),
builder
.stream(sourceNode.getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde))
.map(nonWindowedMapper)
.transformValues(new AddTimestampColumnValueTransformerSupplier()),
sourceNode.getKeyField(), new ArrayList<>(),
SchemaKStream.Type.SOURCE);
}

private <K> KTable table(final KStream<K, GenericRow> stream, final Serde<K> keySerde, final Serde<GenericRow> valueSerde) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.planner.plan.SourceNode;
import io.confluent.ksql.planner.plan.StructuredDataSourceNode;
import io.confluent.ksql.util.ExpressionTypeManager;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.ArrayList;
import java.util.List;

public class LogicalPlanner {

private Analysis analysis;
Expand All @@ -61,7 +58,7 @@ public PlanNode buildPlan() {
currentNode = buildSourceNode();
}
if (analysis.getWhereExpression() != null) {
currentNode = buildFilterNode(currentNode.getSchema(), currentNode);
currentNode = buildFilterNode(currentNode);
}
if ((analysis.getGroupByExpressions() != null) && (!analysis.getGroupByExpressions()
.isEmpty())) {
Expand Down Expand Up @@ -129,9 +126,6 @@ private AggregateNode buildAggregateNode(final Schema inputSchema,
}

private ProjectNode buildProjectNode(final Schema inputSchema, final PlanNode sourcePlanNode) {
List<Field> projectionFields = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();

SchemaBuilder projectionSchema = SchemaBuilder.struct();
ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(inputSchema);
for (int i = 0; i < analysis.getSelectExpressions().size(); i++) {
Expand All @@ -148,36 +142,21 @@ private ProjectNode buildProjectNode(final Schema inputSchema, final PlanNode so
analysis.getSelectExpressions());
}

private FilterNode buildFilterNode(final Schema inputSchema, final PlanNode sourcePlanNode) {
private FilterNode buildFilterNode(final PlanNode sourcePlanNode) {

Expression filterExpression = analysis.getWhereExpression();
return new FilterNode(new PlanNodeId("Filter"), sourcePlanNode, filterExpression);
}

private SourceNode buildSourceNode() {

StructuredDataSource fromDataSource = analysis.getFromDataSources().get(0).getLeft();
String alias = analysis.getFromDataSources().get(0).getRight();
Schema fromSchema = SchemaUtil.buildSchemaWithAlias(fromDataSource.getSchema(), alias);

if (fromDataSource instanceof KsqlStream) {
KsqlStream fromStream = (KsqlStream) fromDataSource;
return new StructuredDataSourceNode(new PlanNodeId("KsqlTopic"), fromSchema,
fromDataSource.getKeyField(),
fromDataSource.getTimestampField(),
fromStream.getKsqlTopic().getTopicName(),
alias, fromStream.getDataSourceType(),
fromStream);
} else if (fromDataSource instanceof KsqlTable) {
KsqlTable fromTable = (KsqlTable) fromDataSource;
return new StructuredDataSourceNode(new PlanNodeId("KsqlTopic"), fromSchema,
fromDataSource.getKeyField(),
fromDataSource.getTimestampField(),
fromTable.getKsqlTopic().getTopicName(),
alias, fromTable.getDataSourceType(),
fromTable);
}
private StructuredDataSourceNode buildSourceNode() {

Pair<StructuredDataSource, String> dataSource = analysis.getFromDataSource(0);
Schema fromSchema = SchemaUtil.buildSchemaWithAlias(dataSource.left.getSchema(), dataSource.right);

if (dataSource.left instanceof KsqlStream
|| dataSource.left instanceof KsqlTable) {
return new StructuredDataSourceNode(new PlanNodeId("KsqlTopic"), dataSource.left, fromSchema);
}
throw new RuntimeException("Data source is not supported yet.");
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,30 @@

import javax.annotation.concurrent.Immutable;
import java.util.List;

import static java.util.Objects.requireNonNull;
import java.util.Objects;

@Immutable
public class StructuredDataSourceNode
extends SourceNode {
extends PlanNode {

private final StructuredDataSource structuredDataSource;
private final Schema schema;
private final String topicName;
private final Field keyField;
private final String alias;
StructuredDataSource structuredDataSource;

// TODO: pass in the "assignments" and the "outputs" separately
// TODO: (i.e., get rid if the symbol := symbol idiom)
@JsonCreator
public StructuredDataSourceNode(@JsonProperty("id") final PlanNodeId id,
@JsonProperty("schema") final Schema schema,
@JsonProperty("keyField") final Field keyField,
@JsonProperty("timestampField") final Field timestampField,
@JsonProperty("topicName") final String topicName,
@JsonProperty("alias") final String alias,
@JsonProperty("dataSourceType")
final StructuredDataSource.DataSourceType
dataSourceType,
@JsonProperty("structuredDataSource")
final StructuredDataSource structuredDataSource) {
super(id, timestampField, dataSourceType);

@JsonProperty("structuredDataSource") final StructuredDataSource structuredDataSource,
@JsonProperty("schema") Schema schema) {
super(id);
Objects.requireNonNull(structuredDataSource, "structuredDataSource can't be null");
Objects.requireNonNull(schema, "schema can't be null");
this.schema = schema;
requireNonNull(topicName, "topicName is null");

this.topicName = topicName;
this.keyField = keyField;
this.alias = alias;
this.structuredDataSource = structuredDataSource;
}

public String getTopicName() {
return topicName;
return structuredDataSource.getTopicName();
}

@Override
Expand All @@ -74,11 +58,7 @@ public Schema getSchema() {

@Override
public Field getKeyField() {
return keyField;
}

public String getAlias() {
return alias;
return structuredDataSource.getKeyField();
}

public StructuredDataSource getStructuredDataSource() {
Expand All @@ -94,4 +74,12 @@ public List<PlanNode> getSources() {
public <C, R> R accept(PlanVisitor<C, R> visitor, C context) {
return visitor.visitStructuredDataSourceNode(this, context);
}

public StructuredDataSource.DataSourceType getDataSourceType() {
return structuredDataSource.getDataSourceType();
}

public Field getTimestampField() {
return structuredDataSource.getTimestampField();
}
}
Loading

0 comments on commit d18a471

Please sign in to comment.