-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: New API - integrate push queries into backend #4495
Conversation
67a8589
to
effd22c
Compare
64a3ee9
to
8f94874
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - since these PRs are adding pretty hefty chunks of code that will be critical in the future, I think it'll be good to get a second pair of eyes
public String getName() { | ||
return "tim"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid "hacks" like this? I've seen these things eventually end up leaking to the user and causing some level of confusion (e.g. an error message saying "The supplied principal "tim" does not have permissions to access FOO")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it has to return something, would you prefer "almog" ? ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while I'd love to have it be "almog" 😉 if it has to be set, can it be "NO_PRINICPAL"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
ksql-api/src/main/java/io/confluent/ksql/api/plugin/KsqlServerEndpoints.java
Show resolved
Hide resolved
@@ -55,6 +55,11 @@ | |||
public static final String CERT_PATH_DOC = | |||
"Path to cert file"; | |||
|
|||
public static final String WORKER_POOL_SIZE = propertyName("worker-pool-size"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: all of our other configs have words separated by .
s can we follow that here too? (and all the above configs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty confused by this convention.
It seems that dots are
a) used to create scoping for properties - this seems a very normal use of dots in properties to me. E.g. ksql.api.* contains all stuff related to the api. ksql.engine.* - contains all stuff related to engine. Seems very reasonable and expected.
b) also used to separate words! Seems really odd, and breaks the scoping rules in a. Why do we do this? Seems broken to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@purplefox I think a main reason for .
s is to follow the environment variable naming pattern, which translates e.g. an env var named KSQL_WORKER_POOL_SIZE
into worker.pool.size
in server properties.
To get a hyphenated property name, the env var would have to be e.g. KSQL_WORKER___POOL___SIZE
🙃
the converter is called env_to_props
:
https://github.com/confluentinc/confluent-docker-utils/blob/3427c198e83b5d65d91b580a6df589f5d4799c14/confluent/docker_utils/dub.py#L50
here's how it applies for docker builds:
https://github.com/confluentinc/ksql/blob/master/ksql-docker/src/include/docker/run#L32
{% set kr_props = env_to_props('KSQL_', '') -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ The existing convention for env var names is well-established and shared across Confluent components and their documentation. From the perspective of code it is indeed awkward that it breaks expectations of namespacing. However, considering end-user ergonomics, I believe we should keep the pattern in place and use dots for the prop names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also might be just a byproduct of a bygone age... I know that LinkedIn had the same convention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird... but ack
String[] getColumnNames(); | ||
|
||
String[] getColumnTypes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use List<String>
in this API? Arrays are rather brittle and there's not much of a benefit of using them if it's non-primitive anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change this.
log.error("Failed to close query", ar.cause()); | ||
} | ||
}); | ||
super.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to do this before we've stopped the query handler? (i.e. should we move this into executeBlocking?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it as an issue.
if (buildResult instanceof KStreamHolder<?>) { | ||
kstream = ((KStreamHolder<?>) buildResult).getStream(); | ||
} else if (buildResult instanceof KTableHolder<?>) { | ||
final KTable<?, GenericRow> ktable = ((KTableHolder<?>) buildResult).getTable(); | ||
kstream = ktable.toStream(); | ||
} else { | ||
throw new IllegalStateException("Unexpected type built from exection plan"); | ||
} | ||
|
||
kstream.foreach((k, row) -> { | ||
if (row == null) { | ||
return; | ||
} | ||
rowConsumer.accept(row); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a little too hacked together for my liking - it's like making a modification to the physical/logical plan "outside" of the physical and logical planners. Not sure I have suggestions at the moment, but give it a thought
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty much an exact copy and paste of the existing code for transient queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the main difference is that this adds a forEach
, which actually affects the execution plan/topology
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The foreach is from here https://github.com/confluentinc/ksql/blob/master/ksql-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java#L59
I've basically just simplified the pre-existing code into a single place. There's no new logic though.
|
||
import io.confluent.ksql.GenericRow; | ||
|
||
public interface RowConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just have Consumer<GenericRow>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
if (apiServer != null) { | ||
apiServer.stop(); | ||
apiServer = null; | ||
} | ||
if (vertx != null) { | ||
vertx.close(); | ||
vertx = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try/catch these as well?
import org.junit.rules.RuleChain; | ||
|
||
@Category({IntegrationTest.class}) | ||
public class NewApiTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above, can we avoid the word New
in the code? Especially after we're done with the migration this won't make much sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a temporary name and will be renamed before KLIP-15 is complete. The only reason new is there is to distinguish it from the current rest api test.
QueryResponse queryResponse = new QueryResponse(buff.toString()); | ||
return queryResponse.rows.size(); | ||
} catch (Throwable t) { | ||
return Integer.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any way to communicate this error so that if the test fails here we can figure out why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @purplefox ! LGTM with a bunch of questions inline, mostly just to improve my understanding.
@Override | ||
public void subscribe(final Subscriber<? super T> subscriber) { | ||
Objects.requireNonNull(subscriber); | ||
if (Vertx.currentContext() == ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this if-else block? Specifically:
- when/why would this method be called from a different context than the one passed when creating the publisher?
- why is having the if-else preferable to simply always making the call async (i.e., the contents of the "else" part)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The class doesn't control who is calling subscribe so we can't assume it's always called from the same context.
- If already on same context then it's safe to call directly which will be faster than doing it asynchronously.
this.closeHandler = Objects.requireNonNull(closeHandler); | ||
this.id = new PushQueryId(UUID.randomUUID().toString()); | ||
server.registerQuery(this); | ||
} | ||
|
||
public void close() { | ||
server.removeQuery(id); | ||
querySubscriber.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we're not closing the subscriber anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscriber doesn't have a close method any more, it's not needed. Closing the publisher will cause an onComplete to be sent to the subscriber which will result in the response being ended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to pass the subscriber into PushQueryHolder
in that case? Looks like it's unused.
} | ||
|
||
protected final void sendError(final Exception e) { | ||
checkContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of asserting the context here, rather than in the other methods that call sendError()
? Would the context here ever be different from the ones in those methods (doSubscribe()
and doRequest()
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a method that accesses internal state so seems like a sensible place to check the context.
public synchronized void accept(final GenericRow row) { | ||
Objects.requireNonNull(row); | ||
|
||
if (closed || complete || !checkLimit()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we don't check cancelled
here?
} | ||
|
||
@Test | ||
public void shouldDeliverAfterSubscribe() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test offer anything beyond PublisherTestBase#shouldDeliverAllRequestingOneByOneLoadAfterSubscribe()
? AFAICT they appear to be testing the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -23,11 +23,12 @@ | |||
private ErrorCodes() { | |||
} | |||
|
|||
public static final int ERROR_CODE_MISSING_PARAM = 50001; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the decision to remove the 500
prefixes? These error codes are user-facing, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should tie our error codes to HTTP status codes, they seem orthogonal.
E.g. If I stream some inserts to the server, then I'll get a 200 OK response and the acks for the inserts will start coming up. Maybe the 100000th insert has some malformed JSON in which case the ack stream will contain an error "malformed JSON" and the response will be ended. The error here has got nothing to do with the HTTP status code (which was 200 in this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error codes are exposed to users, though, right? How will a user understand the meaning of the error codes? (Or is that not the purpose of the error codes?)
@Override | ||
protected PushQueryHandler createQuery(final String sql, final JsonObject properties, | ||
final Context context, final WorkerExecutor workerExecutor, final RowConsumer rowConsumer) { | ||
// Must be run on worker as all this stuff is slow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon the ignorance, but how do we know "this stuff is slow"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anything taking more than a very small number of milliseconds will be slow for an event loop. I think we should err on the side of caution.
.withoutPlainListeners() | ||
.withSaslSslListeners() | ||
.withAclsEnabled(SUPER_USER.username) | ||
.withAcl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ACLs setup relevant to the tests in this file? If not, can we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think they can be removed :)
} catch (Throwable t) { | ||
return Integer.MAX_VALUE; | ||
} | ||
}, is(7)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this value coming from? Properties is empty and push queries default to using auto.offset.reset=latest
so I'm surprised rows are being returned in this query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's set in TestKsqlRestApp. But we also need to set it in the query properties. Good catch!
private final KsqlEngine ksqlEngine; | ||
private final KsqlConfig ksqlConfig; | ||
private final KsqlSecurityExtension securityExtension; | ||
private final ServiceContextFactory theServiceContextFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not simply serviceContextFactory
? (I assume there's a reason for diverging from convention -- I'm just not seeing what it is.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's vestigial from when there was something else called serviceContextFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not finished, but a few comments...
public String[] getColumnNames() { | ||
return colNamesFromSchema(queryMetadata.getLogicalSchema()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use List
rather than naked arrays please?
protected boolean cancelled; | ||
|
||
public BasePublisher(final Context ctx) { | ||
this.ctx = ctx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: validate params that will be stored in object state; ensuring object does not get into an invalid state.
/** | ||
* A query publisher that uses an internal blocking queue to store rows for delivery. It's currently | ||
* necessary to use a blocking queue as Kafka Streams delivers message in a synchronous fashion with | ||
* no back pressure. If the queue was not blocking then if the subscriber was slow the messages | ||
* could build up on the queue eventually resulting in out of memory. The only mechanism we have to | ||
* slow streams down is to block the thread. Kafka Streams uses dedicated streams per topology so | ||
* this won't prevent the thread from doing useful work elsewhere but it does mean we can't have too | ||
* many push queries in the server at any one time as we can end up with a lot of threads. Ideally | ||
* Kafka Streams would use a non-blocking reactive model with back-pressure, e.g. using reactive | ||
* streams. That way a small number of threads would be required to service all topologies and we | ||
* wouldn't need to block. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to avoid such 'I wish Streams did it this way' style comments in the code. Such points are for discussions, not comments in code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 one this one
private String[] columnNames; | ||
private String[] columnTypes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use List
not arrays please.
this.columnTypes = queryHandle.getColumnTypes(); | ||
} | ||
|
||
@SuppressFBWarnings(value = "EI_EXPOSE_REP") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Don't expose mutable object state. It breaks encapsulation. Encapsulation is a pretty standard OO thing.
queryHandle.start(); | ||
} | ||
|
||
private boolean checkLimit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
checkContext(); | ||
|
||
int num = 0; | ||
while (demand > 0 && !queue.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to at request to unblock @purplefox and avoid merge hell with other PRs - I don't see anything fundamentally wrong but (as Tim agreed offline) I'd like the open comments to be addressed in follow-up PRs.
Please address the synchronization issue https://github.com/confluentinc/ksql/pull/4495/files/8f948742a0ce64ab808dd9e5352bba82a613252d#r377817481 before merging
Summarizing the comments that are important to me to be addressed in a follow-up (and the green tick is conditional on fixing or discussing further):
- swapping the
String[]
withList<String>
- addressing the protected state
demand
- removing "new" from the code base and coming up with better naming
39a6807
to
5556da8
Compare
public static final String WORKER_POOL_SIZE = propertyName("worker.pool.size"); | ||
public static final String WORKER_POOL_DOC = | ||
"Max number of worker threads for executing blocking code"; | ||
public static final int DEFAULT_WORKER_POOL_SIZE = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it make sense to have so many workers relative to the number of verticles? Is the expectation that multiple blocking tasks created by the same will be running simultaneously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We choose the number of event loops to be about the same as the number of cores because event loops should always be live, therefore you shouldn't need any more to utilise all the cores.
Workers are often used to execute blocking tasks - often blocking on IO, e.g. waiting on the network or file system etc. In this case the threads can spend a lot of their time inactive so in order to utilise all cores we choose a much larger number of threads than number of cores.
In our case if we find our workers aren't blocking on IO much, just doing long lived CPU tasks then we can probably get away with fewer, although we probably still want a reasonable amount so we don't end up with very long lived tasks causing those behind them to queue too long.
Description
Implements #4256
Testing done
New integration test for the new API
More api tests
More unit tests
Reviewer checklist