diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java index 12e251a09635..cb12a949d6c6 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/StandaloneExecutor.java @@ -54,7 +54,7 @@ public void executeStatements(String queries) throws Exception { Collections.emptyMap(), tempMetaStore); List queryMetadataList = ksqlEngine.planQueries( - false, queryList, new HashMap<>(), tempMetaStore); + queryList, new HashMap<>(), tempMetaStore); for (QueryMetadata queryMetadata: queryMetadataList) { if (queryMetadata instanceof PersistentQueryMetadata) { PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata; diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index 6db9393770f5..077ec09608da 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -68,6 +68,7 @@ public class CliTest extends TestRunner { private static TopicConsumer topicConsumer; private static OrderDataProvider orderDataProvider; + private static int result_stream_no = 0; @BeforeClass public static void setUp() throws Exception { @@ -181,7 +182,7 @@ private static void testCreateStreamAsSelect(String selectQuery, Schema resultSc if (!selectQuery.endsWith(";")) { selectQuery += ";"; } - String resultKStreamName = "RESULT"; + String resultKStreamName = "RESULT_" + result_stream_no++; final String queryString = "CREATE STREAM " + resultKStreamName + " AS " + selectQuery; /* Start Stream Query */ diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java index eff339bf1740..49be50623a50 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java @@ -90,7 +90,7 @@ public MetaStore getMetaStore() { * @throws Exception */ public void sql(String sql) throws Exception { - List queryMetadataList = ksqlEngine.buildMultipleQueries(false, sql, Collections + List queryMetadataList = ksqlEngine.buildMultipleQueries(sql, Collections .emptyMap()); for (QueryMetadata queryMetadata: queryMetadataList) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java index 4ccf9fc8f392..7f5b78e3d592 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java @@ -114,13 +114,11 @@ public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClien /** * Runs the set of queries in the given query string. * - * @param createNewAppId If a new application id should be generated. * @param queriesString The ksql query string. * @return List of query metadata. * @throws Exception Any exception thrown here! */ public List buildMultipleQueries( - final boolean createNewAppId, final String queriesString, final Map overriddenProperties) throws Exception { for (String property : overriddenProperties.keySet()) { @@ -139,12 +137,11 @@ public List buildMultipleQueries( // Build query AST from the query string List> queries = parseQueries(queriesString, overriddenProperties, tempMetaStore); - return planQueries(createNewAppId, queries, overriddenProperties, tempMetaStore); + return planQueries(queries, overriddenProperties, tempMetaStore); } - public List planQueries(final boolean createNewAppId, - final List> statementList, + public List planQueries(final List> statementList, final Map overriddenProperties, final MetaStore tempMetaStore) throws Exception { // Logical plan creation from the ASTs @@ -152,8 +149,7 @@ public List planQueries(final boolean createNewAppId, // Physical plan creation from logical plans. List runningQueries = queryEngine.buildPhysicalPlans( - createNewAppId, - logicalPlans, + logicalPlans, statementList, overriddenProperties, true @@ -178,7 +174,6 @@ public QueryMetadata getQueryExecutionPlan(final Query query) throws Exception { // Physical plan creation from logical plans. List runningQueries = queryEngine.buildPhysicalPlans( - false, logicalPlans, Collections.singletonList(new Pair<>("", query)), Collections.emptyMap(), diff --git a/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java index 94094c5f1aa6..93f7dbe0d8a6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java @@ -113,7 +113,6 @@ private PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMe } List buildPhysicalPlans( - final boolean addUniqueTimeSuffix, final List> logicalPlans, final List> statementList, final Map overriddenStreamsProperties, @@ -131,7 +130,7 @@ List buildPhysicalPlans( } handleDdlStatement((DDLStatement)statement, overriddenStreamsProperties); } else { - buildQueryPhysicalPlan(physicalPlans, addUniqueTimeSuffix, statementPlanPair, + buildQueryPhysicalPlan(physicalPlans, statementPlanPair, overriddenStreamsProperties, updateMetastore); } @@ -140,7 +139,6 @@ List buildPhysicalPlans( } private void buildQueryPhysicalPlan(final List physicalPlans, - final boolean addUniqueTimeSuffix, final Pair statementPlanPair, final Map overriddenStreamsProperties, final boolean updateMetastore) throws Exception { @@ -154,7 +152,6 @@ private void buildQueryPhysicalPlan(final List physicalPlans, ksqlEngine.getTopicClient(), new MetastoreUtil(), ksqlEngine.getFunctionRegistry(), - addUniqueTimeSuffix, overriddenStreamsProperties, updateMetastore, ksqlEngine.getMetaStore() diff --git a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index 7f64d649f526..32edca345b4c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -59,7 +59,6 @@ public class PhysicalPlanBuilder { private final KafkaTopicClient kafkaTopicClient; private final MetastoreUtil metastoreUtil; private final FunctionRegistry functionRegistry; - private final boolean addUniqueTimeSuffix; private final Map overriddenStreamsProperties; private final MetaStore metaStore; private final boolean updateMetastore; @@ -69,7 +68,6 @@ public PhysicalPlanBuilder(final StreamsBuilder builder, final KafkaTopicClient kafkaTopicClient, final MetastoreUtil metastoreUtil, final FunctionRegistry functionRegistry, - final boolean addUniqueTimeSuffix, final Map overriddenStreamsProperties, final boolean updateMetastore, final MetaStore metaStore) { @@ -78,7 +76,6 @@ public PhysicalPlanBuilder(final StreamsBuilder builder, this.kafkaTopicClient = kafkaTopicClient; this.metastoreUtil = metastoreUtil; this.functionRegistry = functionRegistry; - this.addUniqueTimeSuffix = addUniqueTimeSuffix; this.overriddenStreamsProperties = overriddenStreamsProperties; this.metaStore = metaStore; this.updateMetastore = updateMetastore; @@ -130,10 +127,7 @@ private QueryMetadata buildPlanForBareQuery(final QueuedSchemaKStream schemaKStr final String transientQueryPrefix, final String statement) { - String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix); - if (addUniqueTimeSuffix) { - applicationId = addTimeSuffix(applicationId); - } + final String applicationId = addTimeSuffix(getBareQueryApplicationId(serviceId, transientQueryPrefix)); KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties); @@ -190,7 +184,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(final SchemaKStream schem } final QueryId queryId = sinkDataSource.getPersistentQueryId(); - final String applicationId = addTimeSuffix(serviceId + persistanceQueryPrefix + queryId); + final String applicationId = serviceId + persistanceQueryPrefix + queryId; KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java index 96bc1ad8bb19..c5bfa331d685 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java @@ -54,10 +54,10 @@ public void shouldRunSimpleStatements() throws Exception { KsqlContext ksqlContext = new KsqlContext(adminClient, kafkaTopicClient, ksqlEngine); - expect(ksqlEngine.buildMultipleQueries(false, statement1, Collections.emptyMap())) + expect(ksqlEngine.buildMultipleQueries(statement1, Collections.emptyMap())) .andReturn (Collections.emptyList()); - expect(ksqlEngine.buildMultipleQueries(false, statement2, Collections.emptyMap())) + expect(ksqlEngine.buildMultipleQueries(statement2, Collections.emptyMap())) .andReturn(getQueryMetadata(new QueryId("CSAS_BIGORDERS"), DataSource.DataSourceType.KSTREAM)); expect(ksqlEngine.getPersistentQueries()).andReturn(liveQueryMap); replay(ksqlEngine); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index 4c9b00855b57..3adcfec46c3a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -94,9 +94,9 @@ public void before() throws Exception { testHarness.publishTestData(usersTopic, userDataProvider, System.currentTimeMillis() - 10000); testHarness.publishTestData(pageViewTopic, pageViewDataProvider, System.currentTimeMillis()); - ksqlEngine.buildMultipleQueries(false, String.format("CREATE TABLE %s (registertime bigint, gender varchar, regionid varchar, " + + ksqlEngine.buildMultipleQueries(String.format("CREATE TABLE %s (registertime bigint, gender varchar, regionid varchar, " + "userid varchar) WITH (kafka_topic='%s', value_format='JSON');", userTable, usersTopic), Collections.emptyMap()); - ksqlEngine.buildMultipleQueries(false, String.format("CREATE STREAM %s (viewtime bigint, userid varchar, pageid varchar) " + + ksqlEngine.buildMultipleQueries(String.format("CREATE STREAM %s (viewtime bigint, userid varchar, pageid varchar) " + "WITH (kafka_topic='%s', value_format='JSON');", pageViewStream, pageViewTopic), Collections.emptyMap()); } @@ -118,7 +118,7 @@ private void validateSelectAllFromUsers() throws Exception { String query = String.format("SELECT * from %s;", userTable); log.debug("Sending query: {}", query); - List queries = ksqlEngine.buildMultipleQueries(false, query, Collections.emptyMap()); + List queries = ksqlEngine.buildMultipleQueries(query, Collections.emptyMap()); assertEquals(1, queries.size()); assertTrue(queries.get(0) instanceof QueuedQueryMetadata); @@ -146,7 +146,7 @@ private void validateSelectFromPageViewsWithSpecificColumn() throws Exception { String query = String.format("SELECT pageid from %s;", pageViewStream); log.debug("Sending query: {}", query); - List queries = ksqlEngine.buildMultipleQueries(false, query, Collections.emptyMap()); + List queries = ksqlEngine.buildMultipleQueries(query, Collections.emptyMap()); assertEquals(1, queries.size()); assertTrue(queries.get(0) instanceof QueuedQueryMetadata); @@ -179,7 +179,7 @@ private String validateSelectAllFromDerivedStream() throws Exception { String selectQuery = String.format("SELECT * from %s;", derivedStream); - List queries = ksqlEngine.buildMultipleQueries(false, selectQuery, Collections.emptyMap()); + List queries = ksqlEngine.buildMultipleQueries(selectQuery, Collections.emptyMap()); assertEquals(1, queries.size()); assertTrue(queries.get(0) instanceof QueuedQueryMetadata); @@ -238,7 +238,7 @@ private String validateSelectAllFromDerivedStream() throws Exception { private void validateCreateStreamUsingLikeClause(String inputStream) throws Exception { String outputStream = createStreamUsingLikeClause(inputStream); String selectPageViewsFromRegion = String.format("SELECT userid, pageid from %s;", outputStream); - List queries = ksqlEngine.buildMultipleQueries(false, selectPageViewsFromRegion, + List queries = ksqlEngine.buildMultipleQueries(selectPageViewsFromRegion, Collections.emptyMap()); assertEquals(1, queries.size()); @@ -277,7 +277,7 @@ private String createStreamUsingLikeClause(String inputStream) throws Exception String createStatement = String.format("CREATE STREAM %s WITH (kafka_topic='pageviews_enriched_r0', " + "value_format='DELIMITED') AS SELECT * FROM %s WHERE regionid LIKE '%%_0';", outputStream, inputStream); - List queryMetadata = ksqlEngine.buildMultipleQueries(false, createStatement, Collections.emptyMap()); + List queryMetadata = ksqlEngine.buildMultipleQueries(createStatement, Collections.emptyMap()); assertEquals(1, queryMetadata.size()); queryMetadata.get(0).getKafkaStreams().start(); return outputStream; @@ -291,7 +291,7 @@ private PersistentQueryMetadata createPageViewsFemaleStream(String streamName) t log.debug("Creating {} using: {}", streamName, createStreamStatement); - List queries = ksqlEngine.buildMultipleQueries(false, createStreamStatement, + List queries = ksqlEngine.buildMultipleQueries(createStreamStatement, Collections.emptyMap()); assertEquals(1, queries.size()); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index b223f385d6b7..b2d8d5c0dd50 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -121,8 +121,8 @@ private void execInitCreateStreamQueries() throws Exception { String messageStreamStr = String.format("CREATE STREAM %s (message varchar) WITH (value_format = 'json', " + "kafka_topic='%s');", messageLogStream, messageLogTopic); - ksqlEngine.buildMultipleQueries(false, ordersStreamStr, Collections.emptyMap()); - ksqlEngine.buildMultipleQueries(false, messageStreamStr, Collections.emptyMap()); + ksqlEngine.buildMultipleQueries(ordersStreamStr, Collections.emptyMap()); + ksqlEngine.buildMultipleQueries(messageStreamStr, Collections.emptyMap()); } @After @@ -154,7 +154,7 @@ public void testSelectDateTimeUDFs() throws Exception { ); PersistentQueryMetadata queryMetadata = - (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(true, queryString, Collections.emptyMap()).get(0); + (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0); queryMetadata.getKafkaStreams().start(); Schema resultSchema = SchemaUtil @@ -181,7 +181,7 @@ public void testSinkProperties() throws Exception { streamName, resultPartitionCount, inputStream); PersistentQueryMetadata queryMetadata = - (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(true, queryString, Collections.emptyMap()).get(0); + (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0); queryMetadata.getKafkaStreams().start(); KafkaTopicClient kafkaTopicClient = ksqlEngine.getTopicClient(); @@ -208,7 +208,7 @@ public void testJsonStreamExtractor() throws Exception { streamName, messageLogStream); PersistentQueryMetadata queryMetadata = - (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(true, queryString, Collections.emptyMap()).get(0); + (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0); queryMetadata.getKafkaStreams().start(); Schema resultSchema = SchemaUtil diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index 483b5c19705c..6f9f1ba32481 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -62,7 +62,6 @@ public void before() { new FakeKafkaTopicClient(), new MetastoreUtil(), functionRegistry, - false, Collections.emptyMap(), false, metaStore diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index c7d27051159c..f84668676190 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -237,7 +237,7 @@ private void handleRunScript(Command command) throws Exception { if (command.getStreamsProperties().containsKey(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY)) { String queries = (String) command.getStreamsProperties().get(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY); - List queryMetadataList = ksqlEngine.buildMultipleQueries(false, queries, + List queryMetadataList = ksqlEngine.buildMultipleQueries(queries, command.getStreamsProperties()); for (QueryMetadata queryMetadata : queryMetadataList) { if (queryMetadata instanceof PersistentQueryMetadata) { @@ -295,7 +295,6 @@ private boolean startQuery( } QueryMetadata queryMetadata = ksqlEngine.buildMultipleQueries( - false, queryString, command.getStreamsProperties() ).get(0); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java index 5fcf50b21264..e5603e297844 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java @@ -53,7 +53,7 @@ class QueryStreamWriter implements StreamingOutput { Map overriddenProperties) throws Exception { QueryMetadata queryMetadata = - ksqlEngine.buildMultipleQueries(true, queryString, overriddenProperties).get(0); + ksqlEngine.buildMultipleQueries(queryString, overriddenProperties).get(0); this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); if (!(queryMetadata instanceof QueuedQueryMetadata)) { throw new Exception(String.format( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index 59b483793909..7700af34bbe8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -117,7 +117,7 @@ public void run() { mockKafkaTopicClient, new KsqlConfig(Collections.EMPTY_MAP) ); - expect(mockKsqlEngine.buildMultipleQueries(true, queryString, requestStreamsProperties)) + expect(mockKsqlEngine.buildMultipleQueries(queryString, requestStreamsProperties)) .andReturn(Collections.singletonList(queuedQueryMetadata)); StatementParser mockStatementParser = mock(StatementParser.class);