Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay all previous events found on the command topic #454 #480

Merged
merged 5 commits into from
Dec 1, 2017

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Nov 23, 2017

terminate persistent queries when a stream or table is dropped.
"compact" previous commands during restart so we don't replay dropped queries etc.
change queryId so that it is derived from the statement and will always be consistent across engines and restarts.
#454

terminate persistent queries when a stream or table is dropped.
"compact" previous commands during restart so we don't replay dropped queries etc.
change queryId so that it is derived from the statement.
@dguy
Copy link
Contributor Author

dguy commented Nov 23, 2017

This superceeds #456
The command topic remains non-compacted

@hjafarpour
Copy link
Contributor

@dguy You are still considering a 1:1 relation between query and data source/sink while we should consider an N:N relation. We also need to separate management of queries from management of data sources/sinks which will require the TERMINATE statement.
This PR still results in incorrect state for KSQL servers. Consider the following statements:

CREATE STREAM bigorders AS SELECT * FROM orders WHERE orderunit > 10;
CREATE STREAM reallybigorders AS SELECT * FROM bigorders WHERE orderunit > 100;
DROP STREAM bigorders;

If a new KSQL server starts and reads these statements from the command topic, according to this PR it won't start the first query and therefore the second query that uses the result of the first query will fail while it is running in the other servers. This is another example where we have N:N relation between queries and data sources/sinks and we cannot just couple a data source/sink with only the query that writes into it but we should also consider the queries that read from it.
I think as you said this needs more discussion. Let's discuss this more in a zoom meeting next week so we can finalize a solution.

@dguy
Copy link
Contributor Author

dguy commented Nov 27, 2017

@hjafarpour you are right the second query won't work, but then it shouldn't as it is no longer correct. How can it select from a stream that doesn't exist?
So, we end up in this weird situation where if we terminate a persistent query we have a table/stream that we can no longer use. We also don't want to have to replay the entire log every time a node starts as it is extremely inefficient.

@hjafarpour
Copy link
Contributor

@dguy terminating a persistent query should not drop the result stream/table from the metastore. It will only terminate the query (streams app).
I have an incomplete implementation for replaying the previous statements in command topic without starting the terminated queries. I had to leave it incomplete because we had higher priority task before the KSQL launch. I would suggest we complete this implementation.
Here is how it works:
When a KSQL server starts it reads all of the previous statements from the command topic. It then creates the set of all terminated queries by finding all TERMINATE statements.
Having the set of terminated statements available, the KSQL server starts execution of previous statements from the beginning. However, when it sees a query that exits in the terminated query set it processed the query normally and updates the metastore accordingly except at the last step, the server does not start the streams app for the query. This will make sure that the state is the same without the overhead of starting and terminating unnecessary steams jobs.
For more details please look at handleStatementWithTerminatedQueries method in [StatementExecutor.java](https://github.com/confluentinc/ksql/blob/4.0.x/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java)
This method gets the terminated queries set as an argument.

@dguy
Copy link
Contributor Author

dguy commented Nov 27, 2017

@hjafarpour yes, if i put terminate back then we could do something as you have suggested. The reason i removed terminate is because once we terminate there is no way of restarting the query unless we drop and re-create, which isn't very intuitive.
Some of this is also related to your referential integrity notes, i.e, we shouldn't be able to drop a table/query that has dependents.

@hjafarpour
Copy link
Contributor

@dguy yes, you are right at the moment after terminating an existing query we cannot create a new query that writes into the same stream/table that a terminated query created without first dropping the stream/table. But this is only for CSAS/CTAS queries and this will change after we add INSERT INTO where you can write into an existing stream/table. Although we don't have this now but we should be able to support it in future.
Also in some usecases we may want to terminate a query but keep the result stream/table to use it in other queries. As an example consider we create a dimension table that does not change and terminate the query that created the dimension table but then use the table to enrich other streams.

@dguy
Copy link
Contributor Author

dguy commented Nov 27, 2017

What i'll do is add back the terminate command. Then we can do a follow up PR to add the support for not re-running terminated queries. I'll keep it such that drop will automatically terminate.

@dguy
Copy link
Contributor Author

dguy commented Nov 27, 2017

Right, i've added back terminate and created #482 to follow up with handling terminated queries that are found on the command topic

Copy link

@bluemonk3y bluemonk3y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code looks fine. it will take some aggressive system tests to ensure its all working properly. i.e. deploy then undeploy all different types of apps (i.e. clickstream etc) - and then check that the contents of the command topic make sense.... tricky edge-cases


import io.confluent.ksql.query.QueryId;

public interface QueryTeminator {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob want to add in the missing 'r'

@@ -39,6 +39,18 @@
*/
void createTopic(String topic, int numPartitions, short replicatonFactor);

/**
* Create a new topic with the specified name, numPartitions and replicatonFactor.
* [warn] synchronous call to get the response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we probably want to update the description to mention that arbitrary configs can be specified.

return id;
}

public String getEntity() {
return getOutputNode().getId().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, what is the Entity here? Some examples would be nice as comments.

if (key.getAction() != CommandId.Action.DROP && !commands.containsKey(key)) {
commands.put(key, record);
} else if (key.getAction() == CommandId.Action.DROP){
commands.remove(new CommandId(key.getType(), key.getEntity(), CommandId.Action.CREATE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should log an error if nothing was removed? This would indicate a bug once we implement referential integrity. Before that, it would help indicate user error.



@Test
public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the context of this test. If I understand correctly, it seems that the KSQL CLI currently accept multiple CREATE TABLE foo .. commands. Are both supposed written to the command topic and then deduped when consumed?

That's my understanding from this test, but it seem strange that we would allow the duplicate CREATE to start with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam by default commands will be evaluated for correctness before being written into the command topic. But as you remember it is possible two different servers evaluate the same command locally and the commands seem fine but after they are written to the command topic the servers will see the duplicate commands. This test will ensure the order of reading the commands stays the same for all servers.

return getTableCommandId(createTableAsSelect.getName().toString());
}

public CommandId getTerminateCommandId(TerminateQuery terminateQuery) {
return new CommandId(CommandId.Type.TERMINATE, Long.toString(terminateQuery.getQueryId()));
return new CommandId(CommandId.Type.TERMINATE, terminateQuery.getQueryId().toString(), CommandId.Action.CREATE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a CREATE action for a terminate command? Seems counter intuitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i know. Needed a way to distinguish a drop xxxx from everything else without having to parse the statement. I'll add another value, EXECUTE as it makes sense in other scenarios too, i.e, querying etc

@@ -26,6 +26,7 @@
public class CommandId {
private final Type type;
private final String entity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the entity supposed to be table or stream name? Adding it in the comment would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a create or drop statement then it will be the table or stream name. If it is terminate it is the queryId - @hjafarpour ?

@@ -47,6 +48,11 @@ public StructuredDataSource cloneWithTimeField(String timestampfieldName) {
return new KsqlStream(dataSourceName, schema, keyField, newTimestampField.get(), ksqlTopic);
}

@Override
public QueryId getPersistentQueryId() {
return new QueryId("CSAS_" + dataSourceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what would be the dataSourceName in a statement like CREATE STREAM enriched_pv as SELECT users.userid AS userid, pageid, region FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam it will be enriched_pv.
@dguy why not assign the query id where we create the query, in PhysicalPlanBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it wouldn't be the same across all engines. i.e., when we compact away the dropped commands the sequence is incorrect. This way the QueryId will be unique as we know we can't have 2 datasources with the same name. In the future when we introduce other persistent queries against the same datasource, we will need to also derive a unique key from them.

Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor questions and comments.

@@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.Map;

import io.confluent.ksql.QueryTeminator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for missing r

@@ -47,6 +48,11 @@ public StructuredDataSource cloneWithTimeField(String timestampfieldName) {
return new KsqlStream(dataSourceName, schema, keyField, newTimestampField.get(), ksqlTopic);
}

@Override
public QueryId getPersistentQueryId() {
return new QueryId("CSAS_" + dataSourceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam it will be enriched_pv.
@dguy why not assign the query id where we create the query, in PhysicalPlanBuilder?

public QueryId getPersistentQueryId() {
return new QueryId("CTAS_" + dataSourceName);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, why not in PhysicalPlanBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put it in these classes, then when we have different persistent queries, i.e., inserts etc, those objects know how to generate their own queryId. Rather than having some other if(..){..}else if(...) block elsewhere in the code. i.e., these objects know what the query id should be.



@Test
public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam by default commands will be evaluated for correctness before being written into the command topic. But as you remember it is possible two different servers evaluate the same command locally and the commands seem fine but after they are written to the command topic the servers will see the duplicate commands. This test will ensure the order of reading the commands stays the same for all servers.

Copy link

@bluemonk3y bluemonk3y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

new ObjectMapper().writer().writeValue(out, new KsqlRequest("CREATE STREAM pageviews_original(viewtime bigint, userid varchar, pageid varchar) " +
"WITH (kafka_topic='pageviews', value_format='JSON');", Collections.emptyMap()));
System.out.println(new String(out.toByteArray()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to remove the main method! Seems to be unnecessary!

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks!

@dguy dguy merged commit d996bdb into confluentinc:master Dec 1, 2017
@dguy dguy deleted the command-topic-master branch December 1, 2017 10:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants