Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.1.x [340] started integration test refactoring #341

Merged
merged 3 commits into from
Oct 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#bootstrap.servers=localhost:1119092
bootstrap.servers=localhost:9092
application.id=ksql_server_quickstart
ksql.command.topic.suffix=commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +43,7 @@ public class StandaloneExecutor {

public StandaloneExecutor(Map streamProperties) {
KsqlConfig ksqlConfig = new KsqlConfig(streamProperties);
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig));
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig, AdminClient.create(ksqlConfig.getKsqlConfigProps())));
}

public void executeStatements(String queries) throws Exception {
Expand All @@ -64,5 +65,4 @@ public void executeStatements(String queries) throws Exception {
}
}
}

}
2 changes: 1 addition & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public static void setUp() throws Exception {
testListOrShowCommands();

orderDataProvider = new OrderDataProvider();
restServer.getKsqlEngine().getKafkaTopicClient().createTopic(orderDataProvider.topicName(), 1, (short)1);
restServer.getKsqlEngine().getTopicClient().createTopic(orderDataProvider.topicName(), 1, (short)1);
produceInputStream(orderDataProvider);
}

Expand Down
24 changes: 21 additions & 3 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.confluent.ksql;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,6 +40,8 @@ public class KsqlContext {
final KsqlEngine ksqlEngine;
private static final String APPLICATION_ID_OPTION_DEFAULT = "ksql_standalone_cli";
private static final String KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT = "localhost:9092";
private final AdminClient adminClient;
private final KafkaTopicClientImpl topicClient;

public KsqlContext() {
this(null);
Expand All @@ -59,7 +64,15 @@ public KsqlContext(Map<String, Object> streamsProperties) {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT);
}
KsqlConfig ksqlConfig = new KsqlConfig(streamsProperties);
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig));

adminClient = AdminClient.create(ksqlConfig.getKsqlConfigProps());
topicClient = new KafkaTopicClientImpl(ksqlConfig, adminClient);
ksqlEngine = new KsqlEngine(ksqlConfig, topicClient);
}


public MetaStore getMetaStore() {
return ksqlEngine.getMetaStore();
}

/**
Expand All @@ -69,8 +82,7 @@ public KsqlContext(Map<String, Object> streamsProperties) {
* @throws Exception
*/
public void sql(String sql) throws Exception {
List<QueryMetadata> queryMetadataList = ksqlEngine.buildMultipleQueries(
false, sql, Collections.emptyMap());
List<QueryMetadata> queryMetadataList = ksqlEngine.buildMultipleQueries(true, sql, Collections.emptyMap());
for (QueryMetadata queryMetadata: queryMetadataList) {
if (queryMetadata instanceof PersistentQueryMetadata) {
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
Expand All @@ -86,6 +98,12 @@ public void sql(String sql) throws Exception {
}
}

public void close() throws IOException {
ksqlEngine.close();
topicClient.close();
adminClient.close();
}

/**
* Terminate a query with the given id.
*
Expand Down
20 changes: 10 additions & 10 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ public class KsqlEngine implements Closeable {
private KsqlConfig ksqlConfig;

private final MetaStore metaStore;
private final KafkaTopicClient kafkaTopicClient;
private final KafkaTopicClient topicClient;
private final DDLCommandExec ddlCommandExec;
private final QueryEngine queryEngine;

private final Map<Long, PersistentQueryMetadata> persistentQueries;
private final Set<QueryMetadata> liveQueries;

public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient) {
public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClient) {
Objects.requireNonNull(ksqlConfig, "Streams properties map cannot be null as it may be mutated later on");

this.ksqlConfig = ksqlConfig;

this.metaStore = new MetaStoreImpl();
this.kafkaTopicClient = kafkaTopicClient;
this.topicClient = topicClient;
this.ddlCommandExec = new DDLCommandExec(metaStore);
this.queryEngine = new QueryEngine(this);

Expand Down Expand Up @@ -263,21 +263,21 @@ private Pair<String, Statement> buildSingleQueryAst(final Statement statement,
} else if (statement instanceof CreateStream) {
ddlCommandExec.tryExecute(
new CreateStreamCommand(
(CreateStream) statement, overriddenProperties, kafkaTopicClient),
(CreateStream) statement, overriddenProperties, topicClient),
tempMetaStoreForParser);
ddlCommandExec.tryExecute(
new CreateStreamCommand(
(CreateStream) statement, overriddenProperties, kafkaTopicClient),
(CreateStream) statement, overriddenProperties, topicClient),
tempMetaStore);
return new Pair<>(statementString, statement);
} else if (statement instanceof CreateTable) {
ddlCommandExec.tryExecute(
new CreateTableCommand(
(CreateTable) statement, overriddenProperties, kafkaTopicClient),
(CreateTable) statement, overriddenProperties, topicClient),
tempMetaStoreForParser);
ddlCommandExec.tryExecute(
new CreateTableCommand(
(CreateTable) statement, overriddenProperties, kafkaTopicClient),
(CreateTable) statement, overriddenProperties, topicClient),
tempMetaStore);
return new Pair<>(statementString, statement);
} else if (statement instanceof DropStream) {
Expand Down Expand Up @@ -345,8 +345,8 @@ public MetaStore getMetaStore() {
return metaStore;
}

public KafkaTopicClient getKafkaTopicClient() {
return kafkaTopicClient;
public KafkaTopicClient getTopicClient() {
return topicClient;
}

public DDLCommandExec getDDLCommandExec() {
Expand Down Expand Up @@ -392,7 +392,7 @@ public void close() throws IOException {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
}
kafkaTopicClient.close();
topicClient.close();
}

public QueryEngine getQueryEngine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient());
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getTopicClient());
SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan);

OutputNode outputNode = physicalPlanBuilder.getPlanSink();
Expand Down Expand Up @@ -375,10 +375,10 @@ private DDLCommand generateDDLCommand(
return new RegisterTopicCommand((RegisterTopic) statement, overriddenProperties);
} else if (statement instanceof CreateStream) {
return new CreateStreamCommand((CreateStream) statement, overriddenProperties,
ksqlEngine.getKafkaTopicClient());
ksqlEngine.getTopicClient());
} else if (statement instanceof CreateTable) {
return new CreateTableCommand((CreateTable) statement, overriddenProperties,
ksqlEngine.getKafkaTopicClient());
ksqlEngine.getTopicClient());
} else if (statement instanceof DropStream) {
return new DropSourceCommand((DropStream) statement);
} else if (statement instanceof DropTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@

public class KafkaTopicClientImpl implements KafkaTopicClient {
private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class);
private final KsqlConfig ksqlConfig;
private final AdminClient adminClient;

public KafkaTopicClientImpl(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig.clone();
public KafkaTopicClientImpl(final KsqlConfig ksqlConfig, AdminClient adminClient) {
this.adminClient = adminClient;
}

public void createTopic(String topic, int numPartitions, short replicatonFactor) {
Expand All @@ -57,8 +57,7 @@ public void createTopic(String topic, int numPartitions, short replicatonFactor)
}
NewTopic newTopic = new NewTopic(topic, numPartitions, replicatonFactor);
try {
AdminClient.create(ksqlConfig.getKsqlConfigProps())
.createTopics(Collections.singleton(newTopic)).all().get();
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " +
topic, e);
Expand All @@ -72,17 +71,15 @@ public boolean isTopicExists(String topic) {

public Set<String> listTopicNames() {
try {
return AdminClient.create(ksqlConfig.getKsqlConfigProps())
.listTopics().names().get();
return adminClient.listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to retrieve kafka topic names", e);
}
}

public Map<String, TopicDescription> describeTopics(Collection<String> topicNames) {
try {
return AdminClient.create(ksqlConfig.getKsqlConfigProps())
.describeTopics(topicNames).all().get();
return adminClient.describeTopics(topicNames).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to describe kafka topics", e);
}
Expand Down
2 changes: 1 addition & 1 deletion ksql-engine/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
log4j.rootLogger=WARN,stdout
log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
Expand Down
Loading