Skip to content

Commit

Permalink
Merge pull request #375 from bluemonk3y/4.0.x-TEST-stream-table-join-…
Browse files Browse the repository at this point in the history
…test

TEST: adding stream-table join
  • Loading branch information
logscape authored Oct 17, 2017
2 parents 7055aca + 87671d0 commit 36829c3
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 23 deletions.
1 change: 0 additions & 1 deletion config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#bootstrap.servers=localhost:1119092
bootstrap.servers=localhost:9092
application.id=ksql_server_quickstart
ksql.command.topic.suffix=commands

listeners=http://localhost:8080
5 changes: 2 additions & 3 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,9 +31,6 @@
import java.util.List;
import java.util.Map;

import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;

public class KsqlContext {

private static final Logger log = LoggerFactory.getLogger(KsqlContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ public void createTopic(String topicName, int numPartitions, short replicatonFac
* @param topicName
* @param recordsToPublish
* @param schema
* @param timestamp
* @return
* @throws InterruptedException
* @throws TimeoutException
* @throws ExecutionException
*/
public Map<String, RecordMetadata> produceData(String topicName, Map<String, GenericRow> recordsToPublish, Schema schema)
public Map<String, RecordMetadata> produceData(String topicName, Map<String, GenericRow> recordsToPublish, Schema schema, Long timestamp)
throws InterruptedException, TimeoutException, ExecutionException {

createTopic(topicName);
Expand All @@ -77,16 +78,18 @@ public Map<String, RecordMetadata> produceData(String topicName, Map<String, Gen
Map<String, RecordMetadata> result = new HashMap<>();
for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
String key = recordEntry.getKey();
ProducerRecord<String, GenericRow>
producerRecord = new ProducerRecord<>(topicName, key, recordEntry.getValue());
Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);
Future<RecordMetadata> recordMetadataFuture = producer.send(buildRecord(topicName, timestamp, recordEntry, key));
result.put(key, recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
producer.close();

return result;
}

private ProducerRecord<String, GenericRow> buildRecord(String topicName, Long timestamp, Map.Entry<String, GenericRow> recordEntry, String key) {
return new ProducerRecord<>(topicName, null, timestamp, key, recordEntry.getValue());
}

private Properties properties() {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ksqlConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
Expand Down Expand Up @@ -173,8 +176,8 @@ public void stop() {
this.embeddedKafkaCluster.stop();
}

public Map<String, RecordMetadata> publishTestData(String topicName, TestDataProvider dataProvider) throws InterruptedException, ExecutionException, TimeoutException {
public Map<String, RecordMetadata> publishTestData(String topicName, TestDataProvider dataProvider, Long timestamp) throws InterruptedException, ExecutionException, TimeoutException {
createTopic(topicName);
return produceData(topicName, dataProvider.data(), dataProvider.schema());
return produceData(topicName, dataProvider.data(), dataProvider.schema(), timestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.confluent.ksql.integration;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlContext;
import io.confluent.ksql.util.ItemDataProvider;
import io.confluent.ksql.util.OrderDataProvider;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Category({IntegrationTest.class})
public class JoinIntTest {

private IntegrationTestHarness testHarness;
private KsqlContext ksqlContext;


private String orderStreamTopic = "OrderTopic";
private OrderDataProvider orderDataProvider;
private Map<String, RecordMetadata> orderRecordMetadataMap;

private String itemTableTopic = "ItemTopic";
private ItemDataProvider itemDataProvider;
private Map<String, RecordMetadata> itemRecordMetadataMap;

@Before
public void before() throws Exception {
testHarness = new IntegrationTestHarness();
testHarness.start();
Map<String, Object> ksqlStreamConfigProps = testHarness.ksqlConfig.getKsqlStreamConfigProps();
// turn caching off to improve join consistency
ksqlStreamConfigProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
ksqlContext = new KsqlContext(ksqlStreamConfigProps);

/**
* Setup test data
*/
testHarness.createTopic(itemTableTopic);
itemDataProvider = new ItemDataProvider();

long now = System.currentTimeMillis();
itemRecordMetadataMap = testHarness.publishTestData(itemTableTopic, itemDataProvider, now-500);


testHarness.createTopic(orderStreamTopic);
orderDataProvider = new OrderDataProvider();
orderRecordMetadataMap = testHarness.publishTestData(orderStreamTopic, orderDataProvider, now );
createStreams();
}

@After
public void after() throws Exception {
ksqlContext.close();
testHarness.stop();
}

@Test
public void shouldLeftJoinOrderAndItems() throws Exception {
final String testStreamName = "OrderedWithDescription".toUpperCase();

final String queryString = String.format(
"CREATE STREAM %s AS SELECT ORDERID, ITEMID, ORDERUNITS, DESCRIPTION FROM orders LEFT JOIN items " +
" on orders.ITEMID = item.ITEMID WHERE orders.ITEMID = 'ITEM_1' ;",
testStreamName
);

ksqlContext.sql(queryString);

Schema resultSchema = ksqlContext.getMetaStore().getSource(testStreamName).getSchema();

Map<String, GenericRow> expectedResults = Collections.singletonMap("ITEM_1", new GenericRow(Arrays.asList(null, null, "ORDER_1", "ITEM_1", 10.0, "home cinema")));

Map<String, GenericRow> results = testHarness.consumeData(testStreamName, resultSchema, 1, new StringDeserializer());

assertThat(results, equalTo(expectedResults));
}

private void createStreams() throws Exception {
ksqlContext.sql("CREATE STREAM orders (ORDERTIME bigint, ORDERID varchar, ITEMID varchar, ORDERUNITS double, PRICEARRAY array<double>, KEYVALUEMAP map<varchar, double>) WITH (kafka_topic='" + orderStreamTopic + "', value_format='JSON');");
ksqlContext.sql("CREATE TABLE items (ID varchar, DESCRIPTION varchar) WITH (kafka_topic='" + itemTableTopic + "', value_format='JSON');");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlContext;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.util.OrderDataProvider;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -43,7 +42,7 @@ public void before() throws Exception {
* Setup test data
*/
dataProvider = new OrderDataProvider();
recordMetadataMap = testHarness.publishTestData(topicName, dataProvider);
recordMetadataMap = testHarness.publishTestData(topicName, dataProvider, null );
createOrdersStream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void before() throws Exception {
* Setup test data
*/
dataProvider = new OrderDataProvider();
recordMetadataMap = testHarness.publishTestData(topicName, dataProvider);
recordMetadataMap = testHarness.publishTestData(topicName, dataProvider, null);
createOrdersStream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class WindowingIntTest {
private Map<String, RecordMetadata> datasetOneMetaData;
private final String topicName = "TestTopic";
private OrderDataProvider dataProvider;
private long now;

@Before
public void before() throws Exception {
Expand All @@ -48,9 +49,11 @@ public void before() throws Exception {
*/
alignTimeToWindowSize(WINDOW_SIZE_SEC);

now = System.currentTimeMillis()+500;

testHarness.createTopic("ORDERS");
dataProvider = new OrderDataProvider();
datasetOneMetaData = testHarness.publishTestData(topicName, dataProvider);
datasetOneMetaData = testHarness.publishTestData(topicName, dataProvider, now - 500);
createOrdersStream();
}

Expand All @@ -63,9 +66,7 @@ public void after() throws Exception {
@Test
public void shouldAggregateTumblingWindow() throws Exception {

// not really required - but lets mess with ms
Thread.sleep(100);
testHarness.publishTestData(topicName, dataProvider);
testHarness.publishTestData(topicName, dataProvider, now);


final String streamName = "TUMBLING_AGGTEST";
Expand Down Expand Up @@ -95,9 +96,7 @@ public void shouldAggregateTumblingWindow() throws Exception {
@Test
public void shouldAggregateHoppingWindow() throws Exception {

// not really required - but lets mess with ms
Thread.sleep(100);
testHarness.publishTestData(topicName, dataProvider);
testHarness.publishTestData(topicName, dataProvider, now);


final String streamName = "HOPPING_AGGTEST";
Expand Down Expand Up @@ -127,9 +126,7 @@ public void shouldAggregateHoppingWindow() throws Exception {
@Test
public void shouldAggregateSessionWindow() throws Exception {

// not really required - but lets mess with ms
Thread.sleep(100);
testHarness.publishTestData(topicName, dataProvider);
testHarness.publishTestData(topicName, dataProvider, now);


final String streamName = "SESSION_AGGTEST";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.ksql.util;

import io.confluent.ksql.GenericRow;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class ItemDataProvider extends TestDataProvider {

private static final String namePrefix =
"ITEM";

private static final String ksqlSchemaString =
"(ID varchar, DESCRIPTION varchar)";

private static final String key = "ID";

private static final Schema schema = SchemaBuilder.struct()
.field("ID", SchemaBuilder.STRING_SCHEMA)
.field("DESCRIPTION", SchemaBuilder.STRING_SCHEMA).build();

private static final Map<String, GenericRow> data = new ItemDataProvider().buildData();

public ItemDataProvider() {
super(namePrefix, ksqlSchemaString, key, schema, data);
}

private Map<String, GenericRow> buildData() {

Map<String, GenericRow> dataMap = new HashMap<>();
dataMap.put("ITEM_1", new GenericRow(Arrays.asList("ITEM_1", "home cinema")));
dataMap.put("ITEM_2", new GenericRow(Arrays.asList("ITEM_2", "clock radio")));
dataMap.put("ITEM_3", new GenericRow(Arrays.asList("ITEM_3", "road bike")));
dataMap.put("ITEM_4", new GenericRow(Arrays.asList("ITEM_4", "mountain bike")));
dataMap.put("ITEM_5", new GenericRow(Arrays.asList("ITEM_5", "snowboard")));
dataMap.put("ITEM_6", new GenericRow(Arrays.asList("ITEM_6", "iphone 10")));
dataMap.put("ITEM_7", new GenericRow(Arrays.asList("ITEM_7", "gopro")));
dataMap.put("ITEM_8", new GenericRow(Arrays.asList("ITEM_8", "cat")));

return dataMap;
}

}

0 comments on commit 36829c3

Please sign in to comment.