Skip to content

Commit

Permalink
reset repartition flag (#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
dguy authored Nov 9, 2017
1 parent 5af365c commit 2961526
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;

import javax.annotation.concurrent.Immutable;

Expand Down Expand Up @@ -162,10 +163,10 @@ public SchemaKStream buildStream(final StreamsBuilder builder,
}

return new SchemaKStream(getSchema(),
builder
resetRepartitionFlag(builder
.stream(getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde))
.map(nonWindowedMapper)
.map(nonWindowedMapper))
.transformValues(new AddTimestampColumn()),
getKeyField(), new ArrayList<>(),
SchemaKStream.Type.SOURCE, functionRegistry);
Expand Down Expand Up @@ -223,23 +224,37 @@ private KTable createKTable(StreamsBuilder builder, final Topology.AutoOffsetRes
final Serde<GenericRow> genericRowSerde,
final Serde<GenericRow> genericRowSerdeAfterRead) {
if (ksqlTable.isWindowed()) {
return table(builder
return table(resetRepartitionFlag(builder
.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(),
Consumed.with(windowedSerde, genericRowSerde)
.withOffsetResetPolicy(autoOffsetReset))
.map(windowedMapper)
.map(windowedMapper))
.transformValues(new AddTimestampColumn()), windowedSerde, genericRowSerdeAfterRead);
} else {
return table(builder
.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde)
.withOffsetResetPolicy(autoOffsetReset))
.map(nonWindowedMapper)
return table(resetRepartitionFlag(
builder.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde)
.withOffsetResetPolicy(autoOffsetReset))
.map(nonWindowedMapper))
.transformValues(new AddTimestampColumn()),
Serdes.String(), genericRowSerdeAfterRead);
}
}

// This is a hack to reset the repartitionRequiredFlag - can be removed once KIP-159 is introduced
// in kafka 1.1
private <K> KStream<K, GenericRow> resetRepartitionFlag(final KStream<K, GenericRow> stream) {
try {
java.lang.reflect.Field repartitionField = KStreamImpl.class.getDeclaredField("repartitionRequired");
repartitionField.setAccessible(true);
repartitionField.set(stream, false);
repartitionField.setAccessible(false);
} catch (NoSuchFieldException | IllegalAccessException e) {
// ignored
}
return stream;
}

private <K> KTable table(final KStream<K, GenericRow> stream, final Serde<K> keySerde, final Serde<GenericRow> valueSerde) {
return stream.groupByKey(Serialized.with(keySerde, valueSerde))
.reduce((genericRow, newValue) -> newValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.confluent.ksql.util.ItemDataProvider;
import io.confluent.ksql.util.OrderDataProvider;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -22,7 +21,6 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;

@Category({IntegrationTest.class})
public class JoinIntTest {
Expand Down Expand Up @@ -76,7 +74,7 @@ public void shouldLeftJoinOrderAndItems() throws Exception {

final String queryString = String.format(
"CREATE STREAM %s AS SELECT ORDERID, ITEMID, ORDERUNITS, DESCRIPTION FROM orders LEFT JOIN items " +
" on orders.ITEMID = item.ITEMID WHERE orders.ITEMID = 'ITEM_1' ;",
" on orders.ITEMID = item.ID WHERE orders.ITEMID = 'ITEM_1' ;",
testStreamName
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ public void shouldBuildSourceNode() throws Exception {
@Test
public void shouldHaveLeftJoin() {
final Topology topology = builder.build();
System.out.println(topology.describe());
final TopologyDescription.Processor leftJoin
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000016");
= (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000013");
final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000015")));
assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000012")));
}

@Test
Expand Down

0 comments on commit 2961526

Please sign in to comment.