diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 91b4a7220195..689baaea8938 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -379,7 +379,12 @@ public CompletableFuture createConnector( KSQL_ENDPOINT, new JsonObject() .put("ksql", - String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)) + String.format( + "CREATE %s CONNECTOR \"%s\" WITH (%s);", + type, + name, + connectorConfigs + )) .put("sessionVariables", sessionVariables), cf, response -> handleSingleEntityResponse( @@ -408,7 +413,7 @@ public CompletableFuture createConnector( KSQL_ENDPOINT, new JsonObject() .put("ksql", - String.format("CREATE %s CONNECTOR %s %s WITH (%s);", + String.format("CREATE %s CONNECTOR %s \"%s\" WITH (%s);", type, ifNotExistsClause, name, connectorConfigs)) .put("sessionVariables", sessionVariables), cf, @@ -426,7 +431,7 @@ public CompletableFuture dropConnector(final String name) { makePostRequest( KSQL_ENDPOINT, new JsonObject() - .put("ksql", "drop connector " + name + ";") + .put("ksql", String.format("drop connector \"%s\";", name)) .put("sessionVariables", sessionVariables), cf, response -> handleSingleEntityResponse( @@ -439,12 +444,12 @@ public CompletableFuture dropConnector(final String name) { @Override public CompletableFuture dropConnector(final String name, final boolean ifExists) { final CompletableFuture cf = new CompletableFuture<>(); - final String ifExistsClause = ifExists ? "if exists " : ""; + final String ifExistsClause = ifExists ? "if exists" : ""; makePostRequest( KSQL_ENDPOINT, new JsonObject() - .put("ksql", "drop connector " + ifExistsClause + name + ";") + .put("ksql", String.format("drop connector %s \"%s\";", ifExistsClause, name)) .put("sessionVariables", sessionVariables), cf, response -> handleSingleEntityResponse( @@ -476,7 +481,7 @@ public CompletableFuture describeConnector(final String na makePostRequest( KSQL_ENDPOINT, new JsonObject() - .put("ksql", "describe connector " + name + ";") + .put("ksql", String.format("describe connector \"%s\";", name)) .put("sessionVariables", sessionVariables), cf, response -> handleSingleEntityResponse( diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 31ea67541620..fe5f67f85d1d 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -1568,6 +1568,7 @@ public void shouldDescribeConnector() throws Exception { final io.confluent.ksql.api.client.ConnectorDescription connector = javaClient.describeConnector("name").get(); // Then: + assertThat(testEndpoints.getLastSql(), is("describe connector \"name\";")); assertThat(connector.state(), is("state")); assertThat(connector.className(), is("connectorClass")); assertThat(connector.type(), is(new ConnectorTypeImpl("SOURCE"))); @@ -1587,7 +1588,7 @@ public void shouldCreateConnector() throws Exception { javaClient.createConnector("name", true, Collections.emptyMap()).get(); // Then: - assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();")); + assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR \"name\" WITH ();")); } @Test @@ -1601,7 +1602,7 @@ public void shouldCreateConnectorIfNotExist() throws Exception { javaClient.createConnector("name", true, Collections.emptyMap(), true).get(); // Then: - assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS name WITH ();")); + assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS \"name\" WITH ();")); } @Test @@ -1614,7 +1615,7 @@ public void shouldDropConnector() throws Exception { javaClient.dropConnector("name").get(); // Then: - assertThat(testEndpoints.getLastSql(), is("drop connector name;")); + assertThat(testEndpoints.getLastSql(), is("drop connector \"name\";")); } @Test @@ -1627,7 +1628,7 @@ public void shouldDropConnectorIfExists() throws Exception { javaClient.dropConnector("name", true).get(); // Then: - assertThat(testEndpoints.getLastSql(), is("drop connector if exists name;")); + assertThat(testEndpoints.getLastSql(), is("drop connector if exists \"name\";")); } @Test @@ -2275,4 +2276,4 @@ public void describeTo(final Description description) { } }; } -} \ No newline at end of file +}