diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java index 6f65a963bd45..12e251a09635 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java @@ -41,7 +41,7 @@ public class StandaloneExecutor { private static final Logger log = LoggerFactory.getLogger(StandaloneExecutor.class); - KsqlEngine ksqlEngine; + private final KsqlEngine ksqlEngine; public StandaloneExecutor(Map streamProperties) throws ExecutionException, InterruptedException { KsqlConfig ksqlConfig = new KsqlConfig(streamProperties); diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index df7d402c69df..a86c56b715df 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -423,7 +423,7 @@ private void printAsTable(KsqlEntity ksqlEntity) { columnHeaders = Arrays.asList("Query ID", "Kafka Topic", "Query String"); rowValues = runningQueries.stream() .map(runningQuery -> Arrays.asList( - Long.toString(runningQuery.getId()), + runningQuery.getId().toString(), runningQuery.getKafkaTopic(), runningQuery.getQueryString() )).collect(Collectors.toList()); diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index 03dc314666b2..6db9393770f5 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -51,7 +51,6 @@ public class CliTest extends TestRunner { @ClassRule public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private static final String COMMANDS_KSQL_TOPIC_NAME = KsqlRestApplication.COMMANDS_KSQL_TOPIC_NAME; private static final int PORT = 9098; private static final String LOCAL_REST_SERVER_ADDR = "http://localhost:" + PORT; @@ -183,23 +182,16 @@ private static void testCreateStreamAsSelect(String selectQuery, Schema resultSc selectQuery += ";"; } String resultKStreamName = "RESULT"; - String resultTopicName = resultKStreamName; final String queryString = "CREATE STREAM " + resultKStreamName + " AS " + selectQuery; /* Start Stream Query */ test(queryString, build("Stream created and running")); /* Assert Results */ - Map results = topicConsumer.readResults(resultTopicName, resultSchema, expectedResults.size(), new StringDeserializer()); - - assertThat(results, equalTo(expectedResults)); + Map results = topicConsumer.readResults(resultKStreamName, resultSchema, expectedResults.size(), new StringDeserializer()); - /* Get first column of the first row in the result set to obtain the queryID */ - String queryID = (String) ((List) run("list queries").data.toArray()[0]).get(0); - - /* Clean Up */ - run("terminate query " + queryID); dropStream(resultKStreamName); + assertThat(results, equalTo(expectedResults)); } private static void dropStream(String name) { diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 8bb9d9513e4f..4bc7779a6980 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -18,6 +18,7 @@ import io.confluent.ksql.FakeException; import io.confluent.ksql.TestTerminal; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.GenericRow; import io.confluent.ksql.rest.client.KsqlRestClient; @@ -95,11 +96,11 @@ public void testPrintKSqlEntityList() throws IOException { properties.put("k3", true); List queries = new ArrayList<>(); - queries.add(new Queries.RunningQuery("select * from t1", "TestTopic", 1)); + queries.add(new Queries.RunningQuery("select * from t1", "TestTopic", new QueryId("0"))); for (int i = 0; i < 5; i++) { KsqlEntityList entityList = new KsqlEntityList(Arrays.asList( - new CommandStatusEntity("e", "topic/1", "SUCCESS", "Success Message"), + new CommandStatusEntity("e", "topic/1/create", "SUCCESS", "Success Message"), new ErrorMessageEntity("e", new FakeException()), new PropertiesList("e", properties), new Queries("e", queries), diff --git a/ksql-common/src/main/java/io/confluent/ksql/query/QueryId.java b/ksql-common/src/main/java/io/confluent/ksql/query/QueryId.java new file mode 100644 index 000000000000..95ae0c03c081 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/query/QueryId.java @@ -0,0 +1,58 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + **/ + +package io.confluent.ksql.query; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; + +import java.util.Objects; + +@JsonSubTypes({}) +public class QueryId { + private final String id; + + @JsonCreator + public QueryId(@JsonProperty("id") final String id) { + Objects.requireNonNull(id, "id can't be null"); + this.id = id; + } + + public String getId() { + return id; + } + + public String toString() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (getClass() != o.getClass()) { + return false; + } + QueryId queryId1 = (QueryId) o; + return Objects.equals(id, queryId1.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java index ccd33818ddf6..4ccf9fc8f392 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java @@ -48,6 +48,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.planner.plan.PlanNode; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.DataSourceExtractor; import io.confluent.ksql.util.KafkaTopicClient; @@ -75,7 +76,7 @@ import java.util.Optional; import java.util.Set; -public class KsqlEngine implements Closeable { +public class KsqlEngine implements Closeable, QueryTerminator { private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class); @@ -91,7 +92,7 @@ public class KsqlEngine implements Closeable { private final DDLCommandExec ddlCommandExec; private final QueryEngine queryEngine; - private final Map persistentQueries; + private final Map persistentQueries; private final Set liveQueries; public final FunctionRegistry functionRegistry; @@ -104,8 +105,7 @@ public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClien this.metaStore = new MetaStoreImpl(); this.topicClient = topicClient; this.ddlCommandExec = new DDLCommandExec(metaStore); - this.queryEngine = new QueryEngine(this, new CommandFactories(topicClient)); - + this.queryEngine = new QueryEngine(this, new CommandFactories(topicClient, this)); this.persistentQueries = new HashMap<>(); this.liveQueries = new HashSet<>(); this.functionRegistry = new FunctionRegistry(); @@ -120,10 +120,9 @@ public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClien * @throws Exception Any exception thrown here! */ public List buildMultipleQueries( - final boolean createNewAppId, - final String queriesString, - final Map overriddenProperties - ) throws Exception { + final boolean createNewAppId, + final String queriesString, + final Map overriddenProperties) throws Exception { for (String property : overriddenProperties.keySet()) { if (IMMUTABLE_PROPERTIES.contains(property)) { throw new IllegalArgumentException( @@ -174,15 +173,16 @@ public List planQueries(final boolean createNewAppId, public QueryMetadata getQueryExecutionPlan(final Query query) throws Exception { // Logical plan creation from the ASTs - List> logicalPlans = queryEngine.buildLogicalPlans(metaStore, Arrays.asList(new Pair<>("", query))); + List> logicalPlans = queryEngine.buildLogicalPlans(metaStore, + Collections.singletonList(new Pair<>("", query))); // Physical plan creation from logical plans. List runningQueries = queryEngine.buildPhysicalPlans( - false, - logicalPlans, - Arrays.asList(new Pair<>("", query)), - Collections.emptyMap(), - false + false, + logicalPlans, + Collections.singletonList(new Pair<>("", query)), + Collections.emptyMap(), + false ); return runningQueries.get(0); } @@ -231,7 +231,7 @@ private Pair buildSingleQueryAst(final Statement statement, log.info("Building AST for {}.", statementString); if (statement instanceof Query) { - return new Pair<>(statementString, (Query) statement); + return new Pair<>(statementString, statement); } else if (statement instanceof CreateStreamAsSelect) { CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement; QuerySpecification querySpecification = (QuerySpecification) createStreamAsSelect.getQuery().getQueryBody(); @@ -298,13 +298,13 @@ private Pair buildSingleQueryAst(final Statement statement, tempMetaStore); return new Pair<>(statementString, statement); } else if (statement instanceof DropStream) { - ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM), tempMetaStore); - ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM), + ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM, this), tempMetaStore); + ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM, this), tempMetaStoreForParser); return new Pair<>(statementString, statement); } else if (statement instanceof DropTable) { - ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE), tempMetaStore); - ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE), + ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE, this), tempMetaStore); + ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE, this), tempMetaStoreForParser); return new Pair<>(statementString, statement); } else if (statement instanceof DropTopic) { @@ -380,7 +380,8 @@ public DDLCommandExec getDDLCommandExec() { return ddlCommandExec; } - public boolean terminateQuery(final long queryId, final boolean closeStreams) { + @Override + public boolean terminateQuery(final QueryId queryId, final boolean closeStreams) { QueryMetadata queryMetadata = persistentQueries.remove(queryId); if (queryMetadata == null) { return false; @@ -392,7 +393,23 @@ public boolean terminateQuery(final long queryId, final boolean closeStreams) { return true; } - public Map getPersistentQueries() { + @Override + public void terminateQueryForEntity(final String entity) { + final Optional query = persistentQueries.values() + .stream() + .filter(persistentQueryMetadata -> persistentQueryMetadata.getEntity().equalsIgnoreCase(entity)) + .findFirst(); + + if (query.isPresent()) { + final PersistentQueryMetadata metadata = query.get(); + log.info("Terminating persistent query {}", metadata.getId()); + metadata.close(); + persistentQueries.remove(metadata.getId()); + liveQueries.remove(metadata); + } + } + + public Map getPersistentQueries() { return new HashMap<>(persistentQueries); } @@ -420,6 +437,7 @@ public void close() throws IOException { } + @Override public boolean terminateAllQueries() { try { for (QueryMetadata queryMetadata : liveQueries) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java index 0c361bcf1bdf..94094c5f1aa6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java @@ -50,19 +50,16 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; class QueryEngine { private static final Logger log = LoggerFactory.getLogger(QueryEngine.class); - private final AtomicLong queryIdCounter; private final KsqlEngine ksqlEngine; private final DDLCommandFactory ddlCommandFactory; QueryEngine(final KsqlEngine ksqlEngine, final DDLCommandFactory ddlCommandFactory) { this.ddlCommandFactory = ddlCommandFactory; - this.queryIdCounter = new AtomicLong(1); this.ksqlEngine = ksqlEngine; } @@ -120,8 +117,7 @@ List buildPhysicalPlans( final List> logicalPlans, final List> statementList, final Map overriddenStreamsProperties, - final boolean updateMetastore - ) throws Exception { + final boolean updateMetastore) throws Exception { List physicalPlans = new ArrayList<>(); @@ -161,8 +157,8 @@ private void buildQueryPhysicalPlan(final List physicalPlans, addUniqueTimeSuffix, overriddenStreamsProperties, updateMetastore, - ksqlEngine.getMetaStore(), - queryIdCounter); + ksqlEngine.getMetaStore() + ); physicalPlans.add(physicalPlanBuilder.buildPhysicalPlan(statementPlanPair)); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/QueryTerminator.java b/ksql-engine/src/main/java/io/confluent/ksql/QueryTerminator.java new file mode 100644 index 000000000000..12d8ae69c216 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/QueryTerminator.java @@ -0,0 +1,26 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package io.confluent.ksql; + +import io.confluent.ksql.query.QueryId; + +public interface QueryTerminator { + boolean terminateQuery(QueryId queryId, boolean closeStreams); + + void terminateQueryForEntity(String entity); + + boolean terminateAllQueries(); +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java index 504a0a91a96d..baa5d5af442c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; +import io.confluent.ksql.QueryTerminator; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.parser.tree.DDLStatement; @@ -33,14 +34,14 @@ public class CommandFactories implements DDLCommandFactory { private final Map, DDLCommandFactory> factories = new HashMap<>(); - public CommandFactories(final KafkaTopicClient topicClient) { + public CommandFactories(final KafkaTopicClient topicClient, final QueryTerminator queryTerminator) { factories.put(RegisterTopic.class, (ddlStatement, properties) -> new RegisterTopicCommand((RegisterTopic)ddlStatement, properties)); factories.put(CreateStream.class, (ddlStatement, properties) -> new CreateStreamCommand((CreateStream) ddlStatement, properties, topicClient)); factories.put(CreateTable.class, (ddlStatement, properties) -> new CreateTableCommand((CreateTable)ddlStatement, properties, topicClient)); factories.put(DropStream.class, (ddlStatement, properties) -> new DropSourceCommand( - (DropStream) ddlStatement, DataSource.DataSourceType.KSTREAM)); + (DropStream) ddlStatement, DataSource.DataSourceType.KSTREAM, queryTerminator)); factories.put(DropTable.class, (ddlStatement, properties) -> new DropSourceCommand( - (DropTable) ddlStatement, DataSource.DataSourceType.KTABLE)); + (DropTable) ddlStatement, DataSource.DataSourceType.KTABLE, queryTerminator)); factories.put(DropTopic.class, (ddlStatement, properties) -> new DropTopicCommand(((DropTopic) ddlStatement))); factories.put(SetProperty.class, (ddlStatement, properties) -> new SetPropertyCommand((SetProperty) ddlStatement, properties)); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java index 3b6288928d4b..ea68c5f4fcff 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DropSourceCommand.java @@ -16,6 +16,7 @@ package io.confluent.ksql.ddl.commands; +import io.confluent.ksql.QueryTerminator; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.StructuredDataSource; import io.confluent.ksql.parser.tree.AbstractStreamDropStatement; @@ -27,10 +28,14 @@ public class DropSourceCommand implements DDLCommand { private final String sourceName; private final DataSource.DataSourceType dataSourceType; + private final QueryTerminator queryTerminator; - public DropSourceCommand(AbstractStreamDropStatement statement, DataSource.DataSourceType dataSourceType) { + public DropSourceCommand(final AbstractStreamDropStatement statement, + final DataSource.DataSourceType dataSourceType, + final QueryTerminator queryTerminator) { this.sourceName = statement.getName().getSuffix(); this.dataSourceType = dataSourceType; + this.queryTerminator = queryTerminator; } @Override @@ -50,6 +55,7 @@ public DDLCommandResult run(MetaStore metaStore) { dataSource.getKsqlTopic().getTopicName()); dropTopicCommand.run(metaStore); metaStore.deleteSource(sourceName); + queryTerminator.terminateQueryForEntity(sourceName); return new DDLCommandResult(true, "Source " + sourceName + " was dropped"); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index ad6f15d2a814..7243b17a0d2b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -22,6 +22,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.MetastoreUtil; import io.confluent.ksql.metastore.StructuredDataSource; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.structured.QueuedSchemaKStream; import io.confluent.ksql.planner.plan.KsqlBareOutputNode; @@ -47,7 +48,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; public class PhysicalPlanBuilder { @@ -60,7 +60,6 @@ public class PhysicalPlanBuilder { private final Map overriddenStreamsProperties; private final MetaStore metaStore; private final boolean updateMetastore; - private final AtomicLong queryIdCounter; public PhysicalPlanBuilder(final StreamsBuilder builder, final KsqlConfig ksqlConfig, @@ -70,8 +69,7 @@ public PhysicalPlanBuilder(final StreamsBuilder builder, final boolean addUniqueTimeSuffix, final Map overriddenStreamsProperties, final boolean updateMetastore, - final MetaStore metaStore, - final AtomicLong queryIdCounter) { + final MetaStore metaStore) { this.builder = builder; this.ksqlConfig = ksqlConfig; this.kafkaTopicClient = kafkaTopicClient; @@ -81,7 +79,6 @@ public PhysicalPlanBuilder(final StreamsBuilder builder, this.overriddenStreamsProperties = overriddenStreamsProperties; this.metaStore = metaStore; this.updateMetastore = updateMetastore; - this.queryIdCounter = queryIdCounter; } public QueryMetadata buildPhysicalPlan(final Pair statementPlanPair) throws Exception { @@ -160,12 +157,6 @@ private QueryMetadata buildPlanForStructuredOutputNode(final SchemaKStream schem final String persistanceQueryPrefix, final String statement) { - long queryId = queryIdCounter.getAndIncrement(); - String applicationId = serviceId + persistanceQueryPrefix + queryId; - if (addUniqueTimeSuffix) { - applicationId = addTimeSuffix(applicationId); - } - if (metaStore.getTopic(outputNode.getKafkaTopicName()) == null) { metaStore.putTopic(outputNode.getKsqlTopic()); } @@ -188,11 +179,16 @@ private QueryMetadata buildPlanForStructuredOutputNode(final SchemaKStream schem schemaKStream.getKeyField(), outputNode.getTimestampField(), outputNode.getKsqlTopic()); + } if (updateMetastore) { metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns()); } + + final QueryId queryId = sinkDataSource.getPersistentQueryId(); + final String applicationId = addTimeSuffix(serviceId + persistanceQueryPrefix + queryId); + KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties); return new PersistentQueryMetadata(statement, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClient.java b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClient.java index fec1fc1f5c37..caf8b468b1b9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClient.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClient.java @@ -39,6 +39,18 @@ public interface KafkaTopicClient extends Closeable { */ void createTopic(String topic, int numPartitions, short replicatonFactor); + /** + * Create a new topic with the specified name, numPartitions and replicatonFactor. + * [warn] synchronous call to get the response + * @param topic name of the topic to create + * @param numPartitions + * @param replicatonFactor + * @param configs any additional topic configs to use + * @throws KafkaTopicException + * @throws KafkaResponseGetFailedException + */ + void createTopic(String topic, int numPartitions, short replicatonFactor, Map configs); + /** * [warn] synchronous call to get the response * @param topic name of the topic diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClientImpl.java b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClientImpl.java index 13e1bc636c22..c20dfbf6f70e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClientImpl.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/KafkaTopicClientImpl.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -55,30 +54,39 @@ public KafkaTopicClientImpl(final AdminClient adminClient) { @Override public void createTopic(final String topic, final int numPartitions, final short replicatonFactor) { + createTopic(topic, numPartitions, replicatonFactor, Collections.emptyMap()); + } + + @Override + public void createTopic(final String topic, + final int numPartitions, + final short replicatonFactor, + final Map configs) { if (isTopicExists(topic)) { - Map topicDescriptions = describeTopics(Arrays.asList(topic)); + Map topicDescriptions = describeTopics(Collections.singletonList(topic)); TopicDescription topicDescription = topicDescriptions.get(topic); if (topicDescription.partitions().size() != numPartitions || topicDescription.partitions().get(0).replicas().size() != replicatonFactor) { throw new KafkaTopicException(String.format( "Topic '%s' does not conform to the requirements Partitions:%d v %d. Replication: %d v %d", topic, - topicDescription.partitions().size(), numPartitions, - topicDescription.partitions().get(0).replicas().size(), replicatonFactor + topicDescription.partitions().size(), numPartitions, + topicDescription.partitions().get(0).replicas().size(), replicatonFactor )); } // Topic with the partitons and replicas exists, reuse it! log.debug("Did not create topic {} with {} partitions and replication-factor {} since it already exists", topic, - numPartitions, replicatonFactor); + numPartitions, replicatonFactor); return; } NewTopic newTopic = new NewTopic(topic, numPartitions, replicatonFactor); + newTopic.configs(configs); try { log.info("Creating topic '{}'", topic); adminClient.createTopics(Collections.singleton(newTopic)).all().get(); } catch (InterruptedException | ExecutionException e) { throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + - topic, e); + topic, e); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index be0349561b24..77fda320eeb0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.planner.plan.OutputNode; @@ -25,14 +26,14 @@ public class PersistentQueryMetadata extends QueryMetadata { - private final long id; + private final QueryId id; public PersistentQueryMetadata(final String statementString, final KafkaStreams kafkaStreams, final OutputNode outputNode, final String executionPlan, - final long id, + final QueryId id, final DataSource.DataSourceType dataSourceType, final String queryApplicationId, final KafkaTopicClient kafkaTopicClient, @@ -43,10 +44,14 @@ public PersistentQueryMetadata(final String statementString, } - public long getId() { + public QueryId getId() { return id; } + public String getEntity() { + return getOutputNode().getId().toString(); + } + @Override public boolean equals(Object o) { if (!(o instanceof PersistentQueryMetadata)) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java index 999ed3c6e4c4..96bc1ad8bb19 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KafkaTopicClientImpl; @@ -42,8 +43,6 @@ public class KsqlContextTest { + "WITH (kafka_topic='ordertopic', value_format='JSON' , " + "key='orderid');\n"; private final String statement2 = "CREATE STREAM BIGORDERS AS SELECT * FROM orders WHERE ORDERUNITS > 5;"; - private final String statement3 = "CREATE TABLE ORDERSUMS AS select itemid, sum(orderunits) from orders window " - + "TUMBLING ( size 30 second) group by itemid;"; @Test public void shouldRunSimpleStatements() throws Exception { @@ -51,7 +50,7 @@ public void shouldRunSimpleStatements() throws Exception { KafkaTopicClient kafkaTopicClient = mock(KafkaTopicClientImpl.class); KsqlEngine ksqlEngine = mock(KsqlEngine.class); - Map liveQueryMap = new HashMap<>(); + Map liveQueryMap = new HashMap<>(); KsqlContext ksqlContext = new KsqlContext(adminClient, kafkaTopicClient, ksqlEngine); @@ -59,7 +58,7 @@ public void shouldRunSimpleStatements() throws Exception { .andReturn (Collections.emptyList()); expect(ksqlEngine.buildMultipleQueries(false, statement2, Collections.emptyMap())) - .andReturn(getQueryMetadata(1, DataSource.DataSourceType.KSTREAM)); + .andReturn(getQueryMetadata(new QueryId("CSAS_BIGORDERS"), DataSource.DataSourceType.KSTREAM)); expect(ksqlEngine.getPersistentQueries()).andReturn(liveQueryMap); replay(ksqlEngine); ksqlContext.sql(statement1); @@ -68,12 +67,11 @@ public void shouldRunSimpleStatements() throws Exception { verify(ksqlEngine); } - private List getQueryMetadata(long queryid, DataSource.DataSourceType type) { + private List getQueryMetadata(QueryId queryid, DataSource.DataSourceType type) { KafkaStreams queryStreams = mock(KafkaStreams.class); queryStreams.start(); expectLastCall(); - PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata(String.valueOf - (queryid), + PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata(queryid.toString(), queryStreams, null, "", diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index be3fee6edd78..31501168eb2b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Optional; +import io.confluent.ksql.QueryTerminator; import io.confluent.ksql.ddl.DdlConfig; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateTable; @@ -46,7 +47,7 @@ public class CommandFactoriesTest { private final KafkaTopicClient topicClient = EasyMock.createNiceMock(KafkaTopicClient.class); - private final CommandFactories commandFactories = new CommandFactories(topicClient); + private final CommandFactories commandFactories = new CommandFactories(topicClient, EasyMock.createMock(QueryTerminator.class)); private final HashMap properties = new HashMap<>(); @Before diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index b3d3173805df..4c9b00855b57 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -16,9 +16,7 @@ package io.confluent.ksql.integration; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.KsqlContext; import io.confluent.ksql.KsqlEngine; -import io.confluent.ksql.parser.tree.Except; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KafkaTopicClientImpl; @@ -30,12 +28,9 @@ import io.confluent.ksql.util.UserDataProvider; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index 3e9485861754..b223f385d6b7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -63,6 +63,7 @@ public class JsonFormatTest { private static final Logger log = LoggerFactory.getLogger(JsonFormatTest.class); private AdminClient adminClient; + private long queryId = 0; @Before diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/StreamsSelectAndProjectIntTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/StreamsSelectAndProjectIntTest.java index d8ff6a41385f..5e96baa3d09c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/StreamsSelectAndProjectIntTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/StreamsSelectAndProjectIntTest.java @@ -8,6 +8,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.test.IntegrationTest; +import org.hamcrest.core.IsCollectionContaining; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -19,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index bf5b5474c7fe..483b5c19705c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -66,7 +65,9 @@ public void before() { false, Collections.emptyMap(), false, - metaStore, new AtomicLong(1) ); + metaStore + ); + planBuilder = new LogicalPlanBuilder(metaStore); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/FakeKafkaTopicClient.java b/ksql-engine/src/test/java/io/confluent/ksql/util/FakeKafkaTopicClient.java index 8643ddc29df3..c8b9b23b6f91 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/FakeKafkaTopicClient.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/FakeKafkaTopicClient.java @@ -33,6 +33,11 @@ public class FakeKafkaTopicClient implements KafkaTopicClient { public void createTopic(String topic, int numPartitions, short replicatonFactor) { } + @Override + public void createTopic(String topic, int numPartitions, short replicatonFactor, Map configs) { + + } + @Override public boolean isTopicExists(String topic) { return false; diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStdOut.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStdOut.java index badd956a8da9..c1c50f7deba9 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStdOut.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStdOut.java @@ -19,6 +19,8 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import io.confluent.ksql.query.QueryId; + public class KsqlStdOut extends StructuredDataSource { public static final String KSQL_STDOUT_NAME = "KSQL_STDOUT_NAME"; @@ -57,4 +59,9 @@ public StructuredDataSource cloneWithTimeKeyColumns() { public StructuredDataSource cloneWithTimeField(String timestampfieldName) { return this; } + + @Override + public QueryId getPersistentQueryId() { + throw new UnsupportedOperationException("KsqlStdOut doesn't support persistent queries"); + } } diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStream.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStream.java index 647aa38ccc4b..d9afa82cc5be 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStream.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlStream.java @@ -16,6 +16,7 @@ package io.confluent.ksql.metastore; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import org.apache.kafka.connect.data.Field; @@ -47,6 +48,11 @@ public StructuredDataSource cloneWithTimeField(String timestampfieldName) { return new KsqlStream(dataSourceName, schema, keyField, newTimestampField.get(), ksqlTopic); } + @Override + public QueryId getPersistentQueryId() { + return new QueryId("CSAS_" + dataSourceName); + } + @Override public String toString() { return getClass().getSimpleName() + " name:" + getName(); diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlTable.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlTable.java index 79b4563ed31c..d71f8cd2a0c0 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlTable.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/KsqlTable.java @@ -16,6 +16,7 @@ package io.confluent.ksql.metastore; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import org.apache.kafka.connect.data.Field; @@ -62,6 +63,11 @@ public StructuredDataSource cloneWithTimeField(String timestampfieldName) { stateStoreName, isWindowed); } + @Override + public QueryId getPersistentQueryId() { + return new QueryId("CTAS_" + dataSourceName); + } + @Override public String toString() { return getClass().getSimpleName() + " name:" + getName(); diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/StructuredDataSource.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/StructuredDataSource.java index 6e0de656b1b6..7f5d99334304 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/StructuredDataSource.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/StructuredDataSource.java @@ -16,6 +16,7 @@ package io.confluent.ksql.metastore; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.KsqlException; import org.apache.kafka.connect.data.Field; @@ -88,4 +89,6 @@ public Field getTimestampField() { public String getTopicName() { return ksqlTopic.getTopicName(); } + + public abstract QueryId getPersistentQueryId(); } diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 1e55df8db2fb..6ce51dd22e60 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -44,7 +44,7 @@ statement | DESCRIBE (qualifiedName | TOPIC qualifiedName) #showColumns | PRINT qualifiedName (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)? #printTopic | (LIST | SHOW) QUERIES #listQueries - | TERMINATE QUERY? INTEGER_VALUE #terminateQuery + | TERMINATE QUERY? STRING #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty | LOAD expression #loadProperties diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index f2a4a55331cf..2073931deb98 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -552,7 +552,7 @@ public Node visitListQueries(SqlBaseParser.ListQueriesContext context) { @Override public Node visitTerminateQuery(SqlBaseParser.TerminateQueryContext context) { return new TerminateQuery(getLocation(context), - Long.parseLong(context.INTEGER_VALUE().getText())); + unquote(context.STRING().getText(), "'")); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java index 0fe3a4dd4f46..0bc9f1427ea4 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java @@ -19,27 +19,28 @@ import java.util.Objects; import java.util.Optional; +import io.confluent.ksql.query.QueryId; + import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; public class TerminateQuery extends Statement { - private final long queryId; + private final QueryId queryId; - public TerminateQuery(long queryId) { + public TerminateQuery(String queryId) { this(Optional.empty(), queryId); } - public TerminateQuery(NodeLocation location, long queryId) { + public TerminateQuery(NodeLocation location, String queryId) { this(Optional.of(location), queryId); } - private TerminateQuery(Optional location, long queryId) { + private TerminateQuery(Optional location, String queryId) { super(location); - this.queryId = requireNonNull(queryId, "table is null"); + this.queryId = new QueryId(queryId); } - public long getQueryId() { + public QueryId getQueryId() { return queryId; } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/Pair.java b/ksql-parser/src/main/java/io/confluent/ksql/util/Pair.java index 0f6f737b581c..d0160910557d 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/Pair.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/Pair.java @@ -49,4 +49,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(left, right); } + + @Override + public String toString() { + return "Pair{" + + "left=" + left + + ", right=" + right + + '}'; + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java index 58236fceb79a..ce3dbb25cb5d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; - import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -64,4 +63,5 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(getKsql(), getStreamsProperties()); } + } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/Queries.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/Queries.java index 048087ebc789..87802cc7fd09 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/Queries.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/Queries.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Objects; +import io.confluent.ksql.query.QueryId; + @JsonTypeName("queries") @JsonSubTypes({}) public class Queries extends KsqlEntity { @@ -46,13 +48,13 @@ public List getQueries() { public static class RunningQuery { private final String queryString; private final String kafkaTopic; - private final long id; + private final QueryId id; @JsonCreator public RunningQuery( @JsonProperty("queryString") String queryString, @JsonProperty("kafkaTopic") String kafkaTopic, - @JsonProperty("id") long id + @JsonProperty("id") QueryId id ) { this.queryString = queryString; this.kafkaTopic = kafkaTopic; @@ -67,7 +69,7 @@ public String getKafkaTopic() { return kafkaTopic; } - public long getId() { + public QueryId getId() { return id; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 18406dc2d7ce..440d8a1cdc82 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; import io.confluent.kafka.serializers.KafkaJsonDeserializer; @@ -305,8 +306,7 @@ public static KsqlRestApplication buildApplication( StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, statementParser, - restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG) - ); + restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)); KsqlResource ksqlResource = new KsqlResource( ksqlEngine, commandStore, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index 14a2241034f7..ddaabca569cf 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -20,8 +20,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import java.util.Objects; @JsonSubTypes({}) @@ -46,6 +46,7 @@ public Map getStreamsProperties() { return new HashMap<>(streamsProperties); } + @Override public boolean equals(Object o) { if (this == o) { @@ -61,6 +62,14 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(getStatement(), getStreamsProperties()); + return Objects.hash(statement, streamsProperties); + } + + @Override + public String toString() { + return "Command{" + + "statement='" + statement + '\'' + + ", streamsProperties=" + streamsProperties + + '}'; } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandId.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandId.java index 10baf66ca39e..9b670bbed377 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandId.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandId.java @@ -26,6 +26,7 @@ public class CommandId { private final Type type; private final String entity; + private final Action action; public enum Type { TOPIC, @@ -34,22 +35,33 @@ public enum Type { TERMINATE } - public CommandId(Type type, String entity) { + public enum Action { + CREATE, + DROP, + EXECUTE + } + + public CommandId(final Type type, + final String entity, + final Action action) { this.type = type; this.entity = entity; + this.action = action; } - public CommandId(String type, String entity) { - this(Type.valueOf(type.toUpperCase()), entity); + public CommandId(final String type, + final String entity, + final String action) { + this(Type.valueOf(type.toUpperCase()), entity, Action.valueOf(action.toUpperCase())); } @JsonCreator public static CommandId fromString(String fromString) { - String[] splitOnSlash = fromString.split("/", 2); - if (splitOnSlash.length != 2) { - throw new IllegalArgumentException("Expected a string of the form /"); + String[] splitOnSlash = fromString.split("/", 3); + if (splitOnSlash.length != 3) { + throw new IllegalArgumentException("Expected a string of the form //"); } - return new CommandId(splitOnSlash[0], splitOnSlash[1]); + return new CommandId(splitOnSlash[0], splitOnSlash[1], splitOnSlash[2]); } public Type getType() { @@ -60,27 +72,28 @@ public String getEntity() { return entity; } + public Action getAction() { + return action; + } + @Override @JsonValue public String toString() { - return String.format("%s/%s", type.toString().toLowerCase(), entity); + return String.format("%s/%s/%s", type.toString().toLowerCase(), entity, action.toString().toLowerCase()); } @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof CommandId)) { - return false; - } + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; CommandId commandId = (CommandId) o; - return getType() == commandId.getType() - && Objects.equals(getEntity(), commandId.getEntity()); + return type == commandId.type && + Objects.equals(entity, commandId.entity) && + action == commandId.action; } @Override public int hashCode() { - return Objects.hash(getType(), getEntity()); + return Objects.hash(type, entity, action); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java index 6befd945fcca..a8971be3c5d4 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java @@ -57,7 +57,7 @@ public CommandId getCommandId(Statement command) { } else if (command instanceof DropTable) { return getDropTableCommandId((DropTable) command); } else if (command instanceof RunScript) { - return new CommandId(CommandId.Type.STREAM, "RunScript"); + return new CommandId(CommandId.Type.STREAM, "RunScript", CommandId.Action.EXECUTE); } else { throw new RuntimeException(String.format( "Cannot assign command ID to statement of type %s", @@ -71,42 +71,42 @@ public CommandId getTopicCommandId(RegisterTopic registerTopic) { if (metaStore.getAllTopicNames().contains(topicName)) { throw new RuntimeException(String.format("Topic %s already exists", topicName)); } - return new CommandId(CommandId.Type.TOPIC, topicName); + return new CommandId(CommandId.Type.TOPIC, topicName, CommandId.Action.CREATE); } - public CommandId getTopicStreamCommandId(CreateStream createStream) { + private CommandId getTopicStreamCommandId(CreateStream createStream) { return getStreamCommandId(createStream.getName().toString()); } - public CommandId getSelectStreamCommandId(CreateStreamAsSelect createStreamAsSelect) { + private CommandId getSelectStreamCommandId(CreateStreamAsSelect createStreamAsSelect) { return getStreamCommandId(createStreamAsSelect.getName().toString()); } - public CommandId getTopicTableCommandId(CreateTable createTable) { + private CommandId getTopicTableCommandId(CreateTable createTable) { return getTableCommandId(createTable.getName().toString()); } - public CommandId getSelectTableCommandId(CreateTableAsSelect createTableAsSelect) { + private CommandId getSelectTableCommandId(CreateTableAsSelect createTableAsSelect) { return getTableCommandId(createTableAsSelect.getName().toString()); } public CommandId getTerminateCommandId(TerminateQuery terminateQuery) { - return new CommandId(CommandId.Type.TERMINATE, Long.toString(terminateQuery.getQueryId())); + return new CommandId(CommandId.Type.TERMINATE, terminateQuery.getQueryId().toString(), CommandId.Action.EXECUTE); } public CommandId getDropTopicCommandId(DropTopic dropTopicQuery) { return new CommandId(CommandId.Type.TOPIC, - dropTopicQuery.getTopicName().getSuffix() + "_DROP"); + dropTopicQuery.getTopicName().getSuffix(), CommandId.Action.DROP); } - public CommandId getDropStreamCommandId(DropStream dropStreamQuery) { + private CommandId getDropStreamCommandId(DropStream dropStreamQuery) { return new CommandId(CommandId.Type.STREAM, - dropStreamQuery.getName().getSuffix() + "_DROP"); + dropStreamQuery.getName().getSuffix(), CommandId.Action.DROP); } - public CommandId getDropTableCommandId(DropTable dropTableQuery) { + private CommandId getDropTableCommandId(DropTable dropTableQuery) { return new CommandId(CommandId.Type.TABLE, - dropTableQuery.getName().getSuffix() + "_DROP"); + dropTableQuery.getName().getSuffix(), CommandId.Action.DROP); } private CommandId getStreamCommandId(String streamName) { @@ -121,6 +121,6 @@ private CommandId getSourceCommandId(CommandId.Type type, String sourceName) { if (metaStore.getAllStructuredDataSourceNames().contains(sourceName)) { throw new RuntimeException(String.format("Source %s already exists", sourceName)); } - return new CommandId(type, sourceName); + return new CommandId(type, sourceName, CommandId.Action.CREATE); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 695abbc8b454..9b8a46850d56 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -87,7 +87,9 @@ void fetchAndRunCommands() { for (ConsumerRecord record : records) { CommandId commandId = record.key(); Command command = record.value(); - executeStatement(command, commandId); + if (command != null) { + executeStatement(command, commandId); + } } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 99f652343d72..347f5858a8a5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -31,13 +31,14 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * Wrapper class for the command topic. Used for reading from the topic (either all messages from @@ -56,13 +57,11 @@ public class CommandStore implements Closeable { private final AtomicBoolean closed; public CommandStore( - String commandTopic, - Consumer commandConsumer, - Producer commandProducer, - CommandIdAssigner commandIdAssigner - ) { + String commandTopic, + Consumer commandConsumer, + Producer commandProducer, + CommandIdAssigner commandIdAssigner) { this.commandTopic = commandTopic; - // TODO: Remove commandConsumer/commandProducer as parameters if not needed in testing this.commandConsumer = commandConsumer; this.commandProducer = commandProducer; this.commandIdAssigner = commandIdAssigner; @@ -89,15 +88,15 @@ public void close() { * @param statement The statement to be distributed * @param streamsProperties Any command-specific Streams properties to use. * @return The ID assigned to the statement - * @throws Exception TODO: Refine this */ public CommandId distributeStatement( String statementString, Statement statement, Map streamsProperties ) throws KsqlException { - CommandId commandId = commandIdAssigner.getCommandId(statement); - Command command = new Command(statementString, streamsProperties); + final CommandId commandId = commandIdAssigner.getCommandId(statement); + final Command command = new Command(statementString, + streamsProperties); try { commandProducer.send(new ProducerRecord<>(commandTopic, commandId, command)).get(); } catch (Exception e) { @@ -122,40 +121,39 @@ public ConsumerRecords getNewCommands() { * @return The commands that have been read from the command topic */ public List> getPriorCommands() { - List> result = new ArrayList<>(); - for (ConsumerRecord commandRecord : getAllPriorCommandRecords()) { - CommandId commandId = commandRecord.key(); - Command command = commandRecord.value(); - if (command != null) { - result.add(new Pair<>(commandId, command)); - } - } - - return result; + return getAllPriorCommandRecords() + .stream() + .map(record -> new Pair<>(record.key(), record.value())).collect(Collectors.toList()); } - private List> getAllPriorCommandRecords() { + private Collection> getAllPriorCommandRecords() { Collection commandTopicPartitions = getTopicPartitionsForTopic(commandTopic); - // Have to poll to make sure subscription has taken effect (subscribe() is lazy) - commandConsumer.poll(0); commandConsumer.seekToBeginning(commandTopicPartitions); -// TODO: correctly handle a sequence of related records log.debug("Reading prior command records"); - List> result = new ArrayList<>(); + final Map> commands = new LinkedHashMap<>(); ConsumerRecords records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC); while (!records.isEmpty()) { - log.debug("Received {} records from poll", records.count()); for (ConsumerRecord record : records) { - result.add(record); + final CommandId key = record.key(); + if (key.getAction() != CommandId.Action.DROP && !commands.containsKey(key)) { + commands.put(key, record); + } else if (key.getAction() == CommandId.Action.DROP){ + if(commands.remove(new CommandId(key.getType(), + key.getEntity(), + CommandId.Action.CREATE)) == null) { + log.warn("drop command {} found without a corresponding create command for" + + " {} {}", key, key.getType(), key.getAction()); + } + } } records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC); } - log.debug("Retrieved records:" + result.size()); - return result; + log.debug("Retrieved records:" + commands.size()); + return commands.values(); } private Collection getTopicPartitionsForTopic(String topic) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index 9eaa248a890e..c7d27051159c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -23,7 +23,7 @@ import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.DDLStatement; -import io.confluent.ksql.serde.DataSource; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.QuerySpecification; @@ -34,6 +34,7 @@ import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.server.StatementParser; +import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -169,7 +170,7 @@ private void completeStatusFuture(CommandId commandId, CommandStatus commandStat private void handleStatementWithTerminatedQueries( Command command, CommandId commandId, - Map terminatedQueries + Map terminatedQueries ) throws Exception { try { String statementString = command.getStatement(); @@ -197,7 +198,7 @@ private void executeStatement( Statement statement, Command command, CommandId commandId, - Map terminatedQueries + Map terminatedQueries ) throws Exception { String statementStr = command.getStatement(); @@ -252,7 +253,7 @@ private void handleRunScript(Command command) throws Exception { private String handleCreateAsSelect(final CreateAsSelect statement, final Command command, final CommandId commandId, - final Map terminatedQueries, + final Map terminatedQueries, final String statementStr) throws Exception { QuerySpecification querySpecification = (QuerySpecification) statement.getQuery().getQueryBody(); @@ -263,7 +264,7 @@ private String handleCreateAsSelect(final CreateAsSelect statement, statement.getProperties(), statement.getPartitionByColumn() ); - if (startQuery(statementStr, query, commandId, terminatedQueries, command.getStreamsProperties())) { + if (startQuery(statementStr, query, commandId, terminatedQueries, command)) { return statement instanceof CreateTableAsSelect ? "Table created and running" : "Stream created and running"; @@ -276,8 +277,8 @@ private boolean startQuery( String queryString, Query query, CommandId commandId, - Map terminatedQueries, - Map queryConfigProperties + Map terminatedQueries, + Command command ) throws Exception { if (query.getQueryBody() instanceof QuerySpecification) { QuerySpecification querySpecification = (QuerySpecification) query.getQueryBody(); @@ -294,11 +295,14 @@ private boolean startQuery( } QueryMetadata queryMetadata = ksqlEngine.buildMultipleQueries( - false, queryString, queryConfigProperties).get(0); + false, + queryString, + command.getStreamsProperties() + ).get(0); if (queryMetadata instanceof PersistentQueryMetadata) { PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata; - long queryId = persistentQueryMetadata.getId(); + final QueryId queryId = persistentQueryMetadata.getId(); if (terminatedQueries != null && terminatedQueries.containsKey(queryId)) { CommandId terminateId = terminatedQueries.get(queryId); @@ -327,10 +331,10 @@ private boolean startQuery( } private void terminateQuery(TerminateQuery terminateQuery) throws Exception { - long queryId = terminateQuery.getQueryId(); - QueryMetadata queryMetadata = ksqlEngine.getPersistentQueries().get(queryId); + final QueryId queryId = terminateQuery.getQueryId(); + final QueryMetadata queryMetadata = ksqlEngine.getPersistentQueries().get(queryId); if (!ksqlEngine.terminateQuery(queryId, true)) { - throw new Exception(String.format("No running query with id %d was found", queryId)); + throw new Exception(String.format("No running query with id %s was found", queryId)); } CommandId.Type commandType; @@ -351,7 +355,7 @@ private void terminateQuery(TerminateQuery terminateQuery) throws Exception { String queryEntity = ((KsqlStructuredDataOutputNode) queryMetadata.getOutputNode()).getKsqlTopic().getName(); - CommandId queryStatementId = new CommandId(commandType, queryEntity); + CommandId queryStatementId = new CommandId(commandType, queryEntity, CommandId.Action.CREATE); statusStore.put( queryStatementId, new CommandStatus(CommandStatus.Status.TERMINATED, "Query terminated") diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 72b863eced48..610076b9cc0c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -414,13 +414,13 @@ private void registerDdlCommandTasks() { }); ddlCommandTasks.put(DropStream.class, (statement, statementText, properties) -> { - DropSourceCommand dropSourceCommand = new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM); + DropSourceCommand dropSourceCommand = new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM, ksqlEngine); executeDDLCommand(dropSourceCommand); return statement.toString(); }); ddlCommandTasks.put(DropTable.class, (statement, statementText, properties) -> { - DropSourceCommand dropSourceCommand = new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE); + DropSourceCommand dropSourceCommand = new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE, ksqlEngine); executeDDLCommand(dropSourceCommand); return statement.toString(); }); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/StatusResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/StatusResource.java index a432be9814ac..21ed266ba7ca 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/StatusResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/StatusResource.java @@ -45,10 +45,10 @@ public Response getAllStatuses() { } @GET - @Path("/{type}/{entity}") - public Response getStatus(@PathParam("type") String type, @PathParam("entity") String entity) + @Path("/{type}/{entity}/{action}") + public Response getStatus(@PathParam("type") String type, @PathParam("entity") String entity, @PathParam("action") String action) throws Exception { - CommandId commandId = new CommandId(type, entity); + CommandId commandId = new CommandId(type, entity, action); Optional commandStatus = statementExecutor.getStatus(commandId); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java index 2729f6536c26..5fcf50b21264 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java @@ -50,8 +50,7 @@ class QueryStreamWriter implements StreamingOutput { KsqlEngine ksqlEngine, long disconnectCheckInterval, String queryString, - Map overriddenProperties - ) + Map overriddenProperties) throws Exception { QueryMetadata queryMetadata = ksqlEngine.buildMultipleQueries(true, queryString, overriddenProperties).get(0); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 78544d39238b..2ed33e8867a3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -50,8 +50,7 @@ public class StreamedQueryResource { public StreamedQueryResource( KsqlEngine ksqlEngine, StatementParser statementParser, - long disconnectCheckInterval - ) { + long disconnectCheckInterval) { this.ksqlEngine = ksqlEngine; this.statementParser = statementParser; this.disconnectCheckInterval = disconnectCheckInterval; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/client/KsqlRestClientTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/client/KsqlRestClientTest.java index 871d91536df8..f712ecc6feb7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/client/KsqlRestClientTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/client/KsqlRestClientTest.java @@ -90,8 +90,8 @@ public void testStatus() { Assert.assertTrue(commandStatusesRestResponse.isSuccessful()); CommandStatuses commandStatuses = commandStatusesRestResponse.getResponse(); Assert.assertTrue(commandStatuses.size() == 2); - Assert.assertTrue(commandStatuses.get(new CommandId(CommandId.Type.TOPIC, "c1")) == CommandStatus.Status.SUCCESS); - Assert.assertTrue(commandStatuses.get(new CommandId(CommandId.Type.TOPIC, "c2")) == + Assert.assertTrue(commandStatuses.get(new CommandId(CommandId.Type.TOPIC, "c1", CommandId.Action.CREATE)) == CommandStatus.Status.SUCCESS); + Assert.assertTrue(commandStatuses.get(new CommandId(CommandId.Type.TOPIC, "c2", CommandId.Action.CREATE)) == CommandStatus.Status.ERROR); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java new file mode 100644 index 000000000000..f054544c7a5b --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -0,0 +1,124 @@ +/** + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.ksql.rest.server.computation; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import io.confluent.ksql.metastore.MetaStoreImpl; +import io.confluent.ksql.util.Pair; + +import static org.easymock.EasyMock.anyLong; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + + +@RunWith(EasyMockRunner.class) +public class CommandStoreTest { + + private static final String COMMAND_TOPIC = "command"; + @Mock(type = MockType.NICE) + private Consumer commandConsumer; + @Mock(type = MockType.NICE) + private Producer commandProducer; + + + @Test + public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() { + final CommandId commandId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); + final Command originalCommand = new Command("some statement", Collections.emptyMap()); + final Command latestCommand = new Command("a new statement", Collections.emptyMap()); + final ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList( + new ConsumerRecord<>("topic", 0, 0, commandId, + originalCommand), + new ConsumerRecord<>("topic", 0, 0, commandId, + latestCommand)) + )); + + EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); + + EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records) + .andReturn(new ConsumerRecords<>(Collections.emptyMap())); + EasyMock.replay(commandConsumer); + + final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); + final List> priorCommands = command.getPriorCommands(); + assertThat(priorCommands, equalTo(Collections.singletonList(new Pair<>(commandId, originalCommand)))); + } + + @Test + public void shouldReplaceCommandWithNewCommandAfterDrop() { + final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); + final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP); + final Command originalCommand = new Command("some statement", Collections.emptyMap()); + final Command dropCommand = new Command("drop", Collections.emptyMap()); + final Command latestCommand = new Command("a new statement", Collections.emptyMap()); + + final ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList( + new ConsumerRecord<>("topic", 0, 0, createId, originalCommand), + new ConsumerRecord<>("topic", 0, 0, dropId, dropCommand), + new ConsumerRecord<>("topic", 0, 0, createId, latestCommand)) + )); + + EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); + + EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records) + .andReturn(new ConsumerRecords<>(Collections.emptyMap())); + EasyMock.replay(commandConsumer); + + final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); + final List> priorCommands = command.getPriorCommands(); + assertThat(priorCommands, equalTo(Collections.singletonList(new Pair<>(createId, latestCommand)))); + } + + @Test + public void shouldRemoveCreateCommandIfItHasBeenDropped() { + final CommandId createId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.CREATE); + final CommandId dropId = new CommandId(CommandId.Type.TABLE, "one", CommandId.Action.DROP); + final Command originalCommand = new Command("some statement", Collections.emptyMap()); + final Command dropCommand = new Command("drop", Collections.emptyMap()); + + final ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("topic", 0), Arrays.asList( + new ConsumerRecord<>("topic", 0, 0, createId, originalCommand), + new ConsumerRecord<>("topic", 0, 0, dropId, dropCommand) + ))); + + EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList()); + + EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records) + .andReturn(new ConsumerRecords<>(Collections.emptyMap())); + EasyMock.replay(commandConsumer); + + final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl())); + final List> priorCommands = command.getPriorCommands(); + assertThat(priorCommands, equalTo(Collections.emptyList())); + } + +} \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 0bed3d56d9d3..a272aeca8e90 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -58,8 +58,8 @@ private StatementExecutor getStatementExecutor() { public void shouldHandleCorrectDDLStatement() throws Exception { StatementExecutor statementExecutor = getStatementExecutor(); Command command = new Command("REGISTER TOPIC users_topic WITH (value_format = 'json', " - + "kafka_topic='user_topic_json');", new HashMap<>()); - CommandId commandId = new CommandId(CommandId.Type.TOPIC, "_CorrectTopicGen"); + + "kafka_topic='user_topic_json');", new HashMap<>()); + CommandId commandId = new CommandId(CommandId.Type.TOPIC, "_CorrectTopicGen", CommandId.Action.CREATE); statementExecutor.handleStatement(command, commandId); Map statusStore = statementExecutor.getStatuses(); Assert.assertNotNull(statusStore); @@ -72,8 +72,8 @@ public void shouldHandleCorrectDDLStatement() throws Exception { public void shouldHandleIncorrectDDLStatement() throws Exception { StatementExecutor statementExecutor = getStatementExecutor(); Command command = new Command("REGIST ER TOPIC users_topic WITH (value_format = 'json', " - + "kafka_topic='user_topic_json');", new HashMap<>()); - CommandId commandId = new CommandId(CommandId.Type.TOPIC, "_IncorrectTopicGen"); + + "kafka_topic='user_topic_json');", new HashMap<>()); + CommandId commandId = new CommandId(CommandId.Type.TOPIC, "_IncorrectTopicGen", CommandId.Action.CREATE); statementExecutor.handleStatement(command, commandId); Map statusStore = statementExecutor.getStatuses(); Assert.assertNotNull(statusStore); @@ -99,39 +99,39 @@ public void shouldHandleCSAS_CTASStatement() throws Exception { StatementExecutor statementExecutor = getStatementExecutor(); Command topicCommand = new Command("REGISTER TOPIC pageview_topic WITH " - + "(value_format = 'json', " - + "kafka_topic='pageview_topic_json');", new HashMap<>()); - CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen"); + + "(value_format = 'json', " + + "kafka_topic='pageview_topic_json');", new HashMap<>()); + CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen", CommandId.Action.CREATE); statementExecutor.handleStatement(topicCommand, topicCommandId); Command csCommand = new Command("CREATE STREAM pageview " - + "(viewtime bigint, pageid varchar, userid varchar) " - + "WITH (registered_topic = 'pageview_topic');", - new HashMap<>()); - CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen"); + + "(viewtime bigint, pageid varchar, userid varchar) " + + "WITH (registered_topic = 'pageview_topic');", + new HashMap<>()); + CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen", CommandId.Action.CREATE); statementExecutor.handleStatement(csCommand, csCommandId); Command csasCommand = new Command("CREATE STREAM user1pv " - + " AS select * from pageview WHERE userid = 'user1';", - new HashMap<>()); + + " AS select * from pageview WHERE userid = 'user1';", + new HashMap<>()); - CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen"); + CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen", CommandId.Action.CREATE); statementExecutor.handleStatement(csasCommand, csasCommandId); Command ctasCommand = new Command("CREATE TABLE user1pvtb " - + " AS select * from pageview window tumbling(size 5 " - + "second) WHERE userid = " - + "'user1' group by pageid;", - new HashMap<>()); + + " AS select * from pageview window tumbling(size 5 " + + "second) WHERE userid = " + + "'user1' group by pageid;", + new HashMap<>()); - CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen"); + CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen", CommandId.Action.CREATE); statementExecutor.handleStatement(ctasCommand, ctasCommandId); - Command terminateCommand = new Command("TERMINATE 1;", + Command terminateCommand = new Command("TERMINATE 'CSAS_USER1PV';", new HashMap<>()); - CommandId terminateCommandId = new CommandId(CommandId.Type.TABLE, "_TerminateGen"); + CommandId terminateCommandId = new CommandId(CommandId.Type.TABLE, "_TerminateGen", CommandId.Action.CREATE); statementExecutor.handleStatement(terminateCommand, terminateCommandId); Map statusStore = statementExecutor.getStatuses(); @@ -151,10 +151,10 @@ public void shouldHandlePriorStatement() throws Exception { TestUtils testUtils = new TestUtils(); List> priorCommands = testUtils.getAllPriorCommandRecords(); - CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen"); - CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen"); - CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen"); - CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen"); + CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen", CommandId.Action.CREATE); + CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen", CommandId.Action.CREATE); + CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen", CommandId.Action.CREATE); + CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen", CommandId.Action.CREATE); statementExecutor.handleStatements(priorCommands); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockCommandStore.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockCommandStore.java deleted file mode 100644 index c8a47a8e1851..000000000000 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockCommandStore.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright 2017 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package io.confluent.ksql.rest.server.mock; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.confluent.ksql.metastore.MetaStoreImpl; -import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.rest.server.computation.Command; -import io.confluent.ksql.rest.server.computation.CommandId; -import io.confluent.ksql.rest.server.computation.CommandIdAssigner; -import io.confluent.ksql.rest.server.computation.CommandStore; -import io.confluent.ksql.rest.server.utils.TestUtils; -import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.Pair; - -public class MockCommandStore extends CommandStore { - - CommandIdAssigner commandIdAssigner; - - private final AtomicBoolean closed; - private boolean isFirstCall = true; - - public MockCommandStore(String commandTopic, - Consumer commandConsumer, - Producer commandProducer, - CommandIdAssigner commandIdAssigner) { - super(commandTopic, commandConsumer, commandProducer, - new CommandIdAssigner(new MetaStoreImpl())); - - commandIdAssigner = new CommandIdAssigner(new MetaStoreImpl()); - closed = new AtomicBoolean(false); - } - - @Override - public void close() { - closed.set(true); - } - - @Override - public ConsumerRecords getNewCommands() { - List> records = new ArrayList<>(); - Map>> recordsMap = new HashMap<>(); - if (isFirstCall) { - List> commands = new TestUtils().getAllPriorCommandRecords(); - for (Pair commandIdCommandPair: commands) { - records.add(new ConsumerRecord( - "T1",10, 100, - commandIdCommandPair.getLeft(), commandIdCommandPair.getRight())); - } - - recordsMap.put(new TopicPartition("T1", 1), records); - isFirstCall = false; - } else { - close(); - } - return new ConsumerRecords<>(recordsMap); - } - - @Override - public CommandId distributeStatement( - String statementString, - Statement statement, - Map streamsProperties - ) throws KsqlException { - CommandId commandId = commandIdAssigner.getCommandId(statement); - return commandId; - } - - @Override - public List> getPriorCommands() { - return new TestUtils().getAllPriorCommandRecords(); - } - -} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java index 3bf52120fc3c..ae5f91442fc9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java @@ -34,6 +34,11 @@ public class MockKafkaTopicClient implements KafkaTopicClient { public void createTopic(String topic, int numPartitions, short replicatonFactor) { } + @Override + public void createTopic(String topic, int numPartitions, short replicatonFactor, Map configs) { + + } + @Override public boolean isTopicExists(String topic) { return false; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockStatusResource.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockStatusResource.java index f4ca6ebe39e6..227258ffa662 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockStatusResource.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockStatusResource.java @@ -38,15 +38,17 @@ public class MockStatusResource { public Response getAllStatuses() { Map statuses = new HashMap<>(); - statuses.put(new CommandId(CommandId.Type.TOPIC, "c1"), CommandStatus.Status.SUCCESS); - statuses.put(new CommandId(CommandId.Type.TOPIC, "c2"), CommandStatus.Status.ERROR); + statuses.put(new CommandId(CommandId.Type.TOPIC, "c1", CommandId.Action.CREATE), CommandStatus.Status.SUCCESS); + statuses.put(new CommandId(CommandId.Type.TOPIC, "c2", CommandId.Action.CREATE), CommandStatus.Status.ERROR); CommandStatuses commandStatuses = new CommandStatuses(statuses); return Response.ok(commandStatuses).build(); } @GET - @Path("/{type}/{entity}") - public Response getStatus(@PathParam("type") String type, @PathParam("entity") String entity) + @Path("/{type}/{entity}/{action}") + public Response getStatus(@PathParam("type") String type, + @PathParam("entity") String entity, + @PathParam("action") String action) throws Exception { return Response.ok("status").build(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 4123add6d0e9..31079a79f1f5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -232,7 +232,7 @@ public void testInstantRegisterTopic() throws Exception { createTopicProperties ); - final CommandId commandId = new CommandId(CommandId.Type.TOPIC, ksqlTopic); + final CommandId commandId = new CommandId(CommandId.Type.TOPIC, ksqlTopic, CommandId.Action.CREATE); final CommandStatus commandStatus = new CommandStatus( CommandStatus.Status.QUEUED, "Statement written to command topic" diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StatusResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StatusResourceTest.java index c90e2d347177..e58c2dfd0cca 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StatusResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StatusResourceTest.java @@ -42,17 +42,17 @@ public class StatusResourceTest { mockCommandStatuses = new HashMap<>(); mockCommandStatuses.put( - new CommandId(CommandId.Type.TOPIC, "test_topic"), + new CommandId(CommandId.Type.TOPIC, "test_topic", CommandId.Action.CREATE), new CommandStatus(CommandStatus.Status.SUCCESS, "Topic created successfully") ); mockCommandStatuses.put( - new CommandId(CommandId.Type.STREAM, "test_stream"), + new CommandId(CommandId.Type.STREAM, "test_stream", CommandId.Action.CREATE), new CommandStatus(CommandStatus.Status.ERROR, "Hi Ewen!") ); mockCommandStatuses.put( - new CommandId(CommandId.Type.TERMINATE, "5"), + new CommandId(CommandId.Type.TERMINATE, "5", CommandId.Action.CREATE), new CommandStatus(CommandStatus.Status.QUEUED, "Command written to command topic") ); } @@ -95,7 +95,7 @@ public void testGetStatus() throws Exception { CommandId commandId = commandEntry.getKey(); CommandStatus expectedCommandStatus = commandEntry.getValue(); - Object statusEntity = testResource.getStatus(commandId.getType().name(), commandId.getEntity()).getEntity(); + Object statusEntity = testResource.getStatus(commandId.getType().name(), commandId.getEntity(), commandId.getAction().name()).getEntity(); assertThat(statusEntity, instanceOf(CommandStatus.class)); CommandStatus testCommandStatus = (CommandStatus) statusEntity; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index 0008ce2f1639..59b483793909 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -33,8 +33,6 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.easymock.EasyMock; -import org.easymock.Mock; import org.junit.Test; import javax.ws.rs.core.Response; @@ -48,11 +46,9 @@ import java.util.Map; import java.util.Scanner; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/utils/TestUtils.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/utils/TestUtils.java index bf67e6c137f6..47bd3fcb0d57 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/utils/TestUtils.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/utils/TestUtils.java @@ -35,7 +35,7 @@ public List> getAllPriorCommandRecords() { Command topicCommand = new Command("REGISTER TOPIC pageview_topic WITH " + "(value_format = 'json', " + "kafka_topic='pageview_topic_json');", new HashMap<>()); - CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen"); + CommandId topicCommandId = new CommandId(CommandId.Type.TOPIC, "_CSASTopicGen", CommandId.Action.CREATE); priorCommands.add(new Pair<>(topicCommandId, topicCommand)); @@ -43,14 +43,14 @@ public List> getAllPriorCommandRecords() { + "(viewtime bigint, pageid varchar, userid varchar) " + "WITH (registered_topic = 'pageview_topic');", new HashMap<>()); - CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen"); + CommandId csCommandId = new CommandId(CommandId.Type.STREAM, "_CSASStreamGen", CommandId.Action.CREATE); priorCommands.add(new Pair<>(csCommandId, csCommand)); Command csasCommand = new Command("CREATE STREAM user1pv " + " AS select * from pageview WHERE userid = 'user1';", new HashMap<>()); - CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen"); + CommandId csasCommandId = new CommandId(CommandId.Type.STREAM, "_CSASGen", CommandId.Action.CREATE); priorCommands.add(new Pair<>(csasCommandId, csasCommand)); @@ -60,7 +60,7 @@ public List> getAllPriorCommandRecords() { + "'user1' group by pageid;", new HashMap<>()); - CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen"); + CommandId ctasCommandId = new CommandId(CommandId.Type.TABLE, "_CTASGen", CommandId.Action.CREATE); priorCommands.add(new Pair<>(ctasCommandId, ctasCommand)); return priorCommands;