Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ksqldb-client-api): fix describeConnector issued query #10151

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,12 @@ public CompletableFuture<Void> createConnector(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql",
String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs))
String.format(
"CREATE %s CONNECTOR \"%s\" WITH (%s);",
type,
name,
connectorConfigs
))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
Expand Down Expand Up @@ -408,7 +413,7 @@ public CompletableFuture<Void> createConnector(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql",
String.format("CREATE %s CONNECTOR %s %s WITH (%s);",
String.format("CREATE %s CONNECTOR %s \"%s\" WITH (%s);",
type, ifNotExistsClause, name, connectorConfigs))
.put("sessionVariables", sessionVariables),
cf,
Expand All @@ -426,7 +431,7 @@ public CompletableFuture<Void> dropConnector(final String name) {
makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "drop connector " + name + ";")
.put("ksql", String.format("drop connector \"%s\";", name))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
Expand All @@ -439,12 +444,12 @@ public CompletableFuture<Void> dropConnector(final String name) {
@Override
public CompletableFuture<Void> dropConnector(final String name, final boolean ifExists) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String ifExistsClause = ifExists ? "if exists " : "";
final String ifExistsClause = ifExists ? "if exists" : "";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "drop connector " + ifExistsClause + name + ";")
.put("ksql", String.format("drop connector %s \"%s\";", ifExistsClause, name))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
Expand Down Expand Up @@ -476,7 +481,7 @@ public CompletableFuture<ConnectorDescription> describeConnector(final String na
makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "describe connector " + name + ";")
.put("ksql", String.format("describe connector \"%s\";", name))
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,7 @@ public void shouldDescribeConnector() throws Exception {
final io.confluent.ksql.api.client.ConnectorDescription connector = javaClient.describeConnector("name").get();

// Then:
assertThat(testEndpoints.getLastSql(), is("describe connector \"name\";"));
assertThat(connector.state(), is("state"));
assertThat(connector.className(), is("connectorClass"));
assertThat(connector.type(), is(new ConnectorTypeImpl("SOURCE")));
Expand All @@ -1587,7 +1588,7 @@ public void shouldCreateConnector() throws Exception {
javaClient.createConnector("name", true, Collections.emptyMap()).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR name WITH ();"));
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR \"name\" WITH ();"));
}

@Test
Expand All @@ -1601,7 +1602,7 @@ public void shouldCreateConnectorIfNotExist() throws Exception {
javaClient.createConnector("name", true, Collections.emptyMap(), true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS name WITH ();"));
assertThat(testEndpoints.getLastSql(), is("CREATE SOURCE CONNECTOR IF NOT EXISTS \"name\" WITH ();"));
}

@Test
Expand All @@ -1614,7 +1615,7 @@ public void shouldDropConnector() throws Exception {
javaClient.dropConnector("name").get();

// Then:
assertThat(testEndpoints.getLastSql(), is("drop connector name;"));
assertThat(testEndpoints.getLastSql(), is("drop connector \"name\";"));
}

@Test
Expand All @@ -1627,7 +1628,7 @@ public void shouldDropConnectorIfExists() throws Exception {
javaClient.dropConnector("name", true).get();

// Then:
assertThat(testEndpoints.getLastSql(), is("drop connector if exists name;"));
assertThat(testEndpoints.getLastSql(), is("drop connector if exists \"name\";"));
}

@Test
Expand Down Expand Up @@ -2275,4 +2276,4 @@ public void describeTo(final Description description) {
}
};
}
}
}