Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay all previous events found on the command topic #454 #480

Merged
merged 5 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import java.util.Optional;
import java.util.Set;

public class KsqlEngine implements Closeable, QueryTeminator {
public class KsqlEngine implements Closeable, QueryTerminator {

private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.confluent.ksql.query.QueryId;

public interface QueryTeminator {
public interface QueryTerminator {
boolean terminateQuery(QueryId queryId, boolean closeStreams);

void terminateQueryForEntity(String entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.HashMap;
import java.util.Map;

import io.confluent.ksql.QueryTeminator;
import io.confluent.ksql.QueryTerminator;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.DDLStatement;
Expand All @@ -34,14 +34,14 @@
public class CommandFactories implements DDLCommandFactory {
private final Map<Class<? extends DDLStatement>, DDLCommandFactory> factories = new HashMap<>();

public CommandFactories(final KafkaTopicClient topicClient, final QueryTeminator queryTeminator) {
public CommandFactories(final KafkaTopicClient topicClient, final QueryTerminator queryTerminator) {
factories.put(RegisterTopic.class, (ddlStatement, properties) -> new RegisterTopicCommand((RegisterTopic)ddlStatement, properties));
factories.put(CreateStream.class, (ddlStatement, properties) -> new CreateStreamCommand((CreateStream) ddlStatement, properties, topicClient));
factories.put(CreateTable.class, (ddlStatement, properties) -> new CreateTableCommand((CreateTable)ddlStatement, properties, topicClient));
factories.put(DropStream.class, (ddlStatement, properties) -> new DropSourceCommand(
(DropStream) ddlStatement, DataSource.DataSourceType.KSTREAM, queryTeminator));
(DropStream) ddlStatement, DataSource.DataSourceType.KSTREAM, queryTerminator));
factories.put(DropTable.class, (ddlStatement, properties) -> new DropSourceCommand(
(DropTable) ddlStatement, DataSource.DataSourceType.KTABLE, queryTeminator));
(DropTable) ddlStatement, DataSource.DataSourceType.KTABLE, queryTerminator));
factories.put(DropTopic.class, (ddlStatement, properties) -> new DropTopicCommand(((DropTopic) ddlStatement)));
factories.put(SetProperty.class, (ddlStatement, properties) -> new SetPropertyCommand((SetProperty) ddlStatement, properties));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.confluent.ksql.ddl.commands;

import io.confluent.ksql.QueryTeminator;
import io.confluent.ksql.QueryTerminator;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.parser.tree.AbstractStreamDropStatement;
Expand All @@ -28,14 +28,14 @@ public class DropSourceCommand implements DDLCommand {

private final String sourceName;
private final DataSource.DataSourceType dataSourceType;
private final QueryTeminator queryTeminator;
private final QueryTerminator queryTerminator;

public DropSourceCommand(final AbstractStreamDropStatement statement,
final DataSource.DataSourceType dataSourceType,
final QueryTeminator queryTeminator) {
final QueryTerminator queryTerminator) {
this.sourceName = statement.getName().getSuffix();
this.dataSourceType = dataSourceType;
this.queryTeminator = queryTeminator;
this.queryTerminator = queryTerminator;
}

@Override
Expand All @@ -55,7 +55,7 @@ public DDLCommandResult run(MetaStore metaStore) {
dataSource.getKsqlTopic().getTopicName());
dropTopicCommand.run(metaStore);
metaStore.deleteSource(sourceName);
queryTeminator.terminateQueryForEntity(sourceName);
queryTerminator.terminateQueryForEntity(sourceName);
return new DDLCommandResult(true, "Source " + sourceName + " was dropped");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.HashMap;
import java.util.Optional;

import io.confluent.ksql.QueryTeminator;
import io.confluent.ksql.QueryTerminator;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
Expand All @@ -47,7 +47,7 @@
public class CommandFactoriesTest {

private final KafkaTopicClient topicClient = EasyMock.createNiceMock(KafkaTopicClient.class);
private final CommandFactories commandFactories = new CommandFactories(topicClient, EasyMock.createMock(QueryTeminator.class));
private final CommandFactories commandFactories = new CommandFactories(topicClient, EasyMock.createMock(QueryTerminator.class));
private final HashMap<String, Expression> properties = new HashMap<>();

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -64,4 +67,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(getKsql(), getStreamsProperties());
}

public static void main(String[] args) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectMapper().writer().writeValue(out, new KsqlRequest("CREATE STREAM pageviews_original(viewtime bigint, userid varchar, pageid varchar) " +
"WITH (kafka_topic='pageviews', value_format='JSON');", Collections.emptyMap()));
System.out.println(new String(out.toByteArray()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to remove the main method! Seems to be unnecessary!

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum Type {

public enum Action {
CREATE,
DROP
DROP,
EXECUTE
}

public CommandId(final Type type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public CommandId getCommandId(Statement command) {
} else if (command instanceof DropTable) {
return getDropTableCommandId((DropTable) command);
} else if (command instanceof RunScript) {
return new CommandId(CommandId.Type.STREAM, "RunScript", CommandId.Action.CREATE);
return new CommandId(CommandId.Type.STREAM, "RunScript", CommandId.Action.EXECUTE);
} else {
throw new RuntimeException(String.format(
"Cannot assign command ID to statement of type %s",
Expand Down Expand Up @@ -91,7 +91,7 @@ private CommandId getSelectTableCommandId(CreateTableAsSelect createTableAsSelec
}

public CommandId getTerminateCommandId(TerminateQuery terminateQuery) {
return new CommandId(CommandId.Type.TERMINATE, terminateQuery.getQueryId().toString(), CommandId.Action.CREATE);
return new CommandId(CommandId.Type.TERMINATE, terminateQuery.getQueryId().toString(), CommandId.Action.EXECUTE);
}

public CommandId getDropTopicCommandId(DropTopic dropTopicQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ private Collection<ConsumerRecord<CommandId, Command>> getAllPriorCommandRecords
if (key.getAction() != CommandId.Action.DROP && !commands.containsKey(key)) {
commands.put(key, record);
} else if (key.getAction() == CommandId.Action.DROP){
commands.remove(new CommandId(key.getType(), key.getEntity(), CommandId.Action.CREATE));
if(commands.remove(new CommandId(key.getType(),
key.getEntity(),
CommandId.Action.CREATE)) == null) {
log.warn("drop command {} found without a corresponding create command for"
+ " {} {}", key, key.getType(), key.getAction());
}
}
}
records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC);
Expand Down