Skip to content

Commit

Permalink
Fixed the regression for stream-table join. Table should always read … (
Browse files Browse the repository at this point in the history
#495)

* Fixed the regression for stream-table join. Table should always read from the begining of the topic.

* Fixed the join test.

* Added unit test.

* Changed EARLIEST to earliest to follow the lowercase format in Kafka.
  • Loading branch information
hjafarpour authored Dec 1, 2017
1 parent bd9bd58 commit a02f0ae
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsBuilder;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -177,15 +180,20 @@ public SchemaKStream buildStream(final StreamsBuilder builder,

}

private SchemaKTable tableForJoin(
// package private for test
SchemaKTable tableForJoin(
final StreamsBuilder builder,
final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient,
final MetastoreUtil metastoreUtil,
final FunctionRegistry functionRegistry,
final Map<String, Object> props) {

final SchemaKStream schemaKStream = right.buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props);
Map<String, Object> joinTableProps = new HashMap<>();
joinTableProps.putAll(props);
joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final SchemaKStream schemaKStream = right.buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, joinTableProps);
if (!(schemaKStream instanceof SchemaKTable)) {
throw new KsqlException("Unsupported Join. Only stream-table joins are supported, but was "
+ getLeft() + "-" + getRight());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.ItemDataProvider;
import io.confluent.ksql.util.OrderDataProvider;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -42,9 +44,11 @@ public class JoinIntTest {
public void before() throws Exception {
testHarness = new IntegrationTestHarness(DataSource.DataSourceSerDe.JSON.name());
testHarness.start();
Map<String, Object> ksqlStreamConfigProps = testHarness.ksqlConfig.getKsqlStreamConfigProps();
Map<String, Object> ksqlStreamConfigProps = new HashMap<>();
ksqlStreamConfigProps.putAll(testHarness.ksqlConfig.getKsqlStreamConfigProps());
// turn caching off to improve join consistency
ksqlStreamConfigProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ksqlContext = KsqlContext.create(ksqlStreamConfigProps);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,35 @@

package io.confluent.ksql.planner.plan;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlStream;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.serde.WindowedSerde;
import io.confluent.ksql.structured.LogicalPlanBuilder;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;

import static io.confluent.ksql.planner.plan.PlanTestUtil.MAP_NODE;
Expand All @@ -50,6 +53,9 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import static org.easymock.EasyMock.mock;


public class JoinNodeTest {
private final KafkaTopicClient topicClient = EasyMock.createNiceMock(KafkaTopicClient.class);

Expand Down Expand Up @@ -81,6 +87,64 @@ public void shouldBuildSourceNode() throws Exception {
assertThat(node.topics(), equalTo("[test2]"));
}

@Test
public void shouldBuildTableNodeWithCorrectAutoCommitOffsetPolicy() throws Exception {

StreamsBuilder streamsBuilder = mock(StreamsBuilder.class);
KsqlConfig ksqlConfig = mock(KsqlConfig.class);
KafkaTopicClient kafkaTopicClient = mock(KafkaTopicClient.class);
MetastoreUtil metastoreUtil = mock(MetastoreUtil.class);
FunctionRegistry functionRegistry = mock(FunctionRegistry.class);

class RightTable extends PlanNode {
final Schema schema;

public RightTable(final PlanNodeId id, Schema schema) {
super(id);
this.schema = schema;
}
@Override
public Schema getSchema() {
return schema;
}

@Override
public Field getKeyField() {
return null;
}

@Override
public List<PlanNode> getSources() {
return null;
}

@Override
public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig,
KafkaTopicClient kafkaTopicClient,
MetastoreUtil metastoreUtil,
FunctionRegistry functionRegistry,
Map<String, Object> props) {
if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) &&
props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().equalsIgnoreCase
("EARLIEST")) {
return mock(SchemaKTable.class);
} else {
throw new KsqlException("auto.offset.reset should be set to EARLIEST.");
}

}
}

RightTable rightTable = new RightTable(new PlanNodeId("1"), joinNode.getRight().getSchema());

JoinNode testJoinNode = new JoinNode(joinNode.getId(), joinNode.getType(), joinNode.getLeft()
, rightTable, joinNode.getLeftKeyFieldName(), joinNode.getRightKeyFieldName(), joinNode
.getLeftAlias(), joinNode.getRightAlias());
testJoinNode.tableForJoin(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry,
new HashMap<>());

}

@Test
public void shouldHaveLeftJoin() {
final Topology topology = builder.build();
Expand Down

0 comments on commit a02f0ae

Please sign in to comment.