Skip to content

Commit

Permalink
Don't run terminated queries at startup #482 (#508)
Browse files Browse the repository at this point in the history
* don't terminate queries at startup

* add test for mulitple create terminate
  • Loading branch information
dguy authored Dec 11, 2017
1 parent d7caf45 commit 298fdde
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.io.Closeable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import io.confluent.ksql.util.Pair;

/**
* Handles the logic of reading distributed commands, including pre-existing commands that were
* issued before being initialized, and then delegating their execution to a
Expand Down Expand Up @@ -98,8 +95,8 @@ void fetchAndRunCommands() {
* @throws Exception TODO: Refine this.
*/
public void processPriorCommands() throws Exception {
List<Pair<CommandId, Command>> priorCommands = commandStore.getPriorCommands();
statementExecutor.handleStatements(priorCommands);
final RestoreCommands restoreCommands = commandStore.getRestoreCommands();
statementExecutor.handleRestoration(restoreCommands);
}

private void executeStatement(Command command, CommandId commandId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -38,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* Wrapper class for the command topic. Used for reading from the topic (either all messages from
Expand Down Expand Up @@ -115,18 +113,9 @@ public ConsumerRecords<CommandId, Command> getNewCommands() {
return commandConsumer.poll(Long.MAX_VALUE);
}

/**
* Collect all commands that have been written to the command topic, starting at the earliest
* offset and proceeding until it appears that all have been returned.
* @return The commands that have been read from the command topic
*/
public List<Pair<CommandId, Command>> getPriorCommands() {
return getAllPriorCommandRecords()
.stream()
.map(record -> new Pair<>(record.key(), record.value())).collect(Collectors.toList());
}
RestoreCommands getRestoreCommands() {
final RestoreCommands restoreCommands = new RestoreCommands();

private Collection<ConsumerRecord<CommandId, Command>> getAllPriorCommandRecords() {
Collection<TopicPartition> commandTopicPartitions = getTopicPartitionsForTopic(commandTopic);

commandConsumer.seekToBeginning(commandTopicPartitions);
Expand All @@ -139,12 +128,12 @@ private Collection<ConsumerRecord<CommandId, Command>> getAllPriorCommandRecords
log.debug("Received {} records from poll", records.count());
for (ConsumerRecord<CommandId, Command> record : records) {
final CommandId key = record.key();
if (key.getAction() != CommandId.Action.DROP && !commands.containsKey(key)) {
commands.put(key, record);
if (key.getAction() != CommandId.Action.DROP) {
restoreCommands.addCommand(record.key(), record.value());
} else if (key.getAction() == CommandId.Action.DROP){
if(commands.remove(new CommandId(key.getType(),
if(!restoreCommands.remove(new CommandId(key.getType(),
key.getEntity(),
CommandId.Action.CREATE)) == null) {
CommandId.Action.CREATE))) {
log.warn("drop command {} found without a corresponding create command for"
+ " {} {}", key, key.getType(), key.getAction());
}
Expand All @@ -153,7 +142,7 @@ private Collection<ConsumerRecord<CommandId, Command>> getAllPriorCommandRecords
records = commandConsumer.poll(POLLING_TIMEOUT_FOR_COMMAND_TOPIC);
}
log.debug("Retrieved records:" + commands.size());
return commands.values();
return restoreCommands;
}

private Collection<TopicPartition> getTopicPartitionsForTopic(String topic) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package io.confluent.ksql.rest.server.computation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import io.confluent.ksql.query.QueryId;

class RestoreCommands {
private final Map<CommandId, Command> toRestore = new LinkedHashMap<>();
private final Map<QueryId, CommandId> allTerminatedQueries = new HashMap<>();
private final List<CommandId> allCommandIds = new ArrayList<>();

void addCommand(final CommandId key, final Command value) {
if (key.getType() == CommandId.Type.TERMINATE) {
allTerminatedQueries.put(new QueryId(key.getEntity()), key);
if (allCommandIds.contains(key)) {
allCommandIds.remove(key);
}
} else if (!toRestore.containsKey(key)){
toRestore.put(key, value);
}
allCommandIds.add(key);
}

boolean remove(final CommandId commandId) {
if (toRestore.remove(commandId) != null) {
allCommandIds.remove(commandId);
return true;
}
return false;
}

interface ForEach {
void apply(final CommandId commandId,
final Command command,
final Map<QueryId, CommandId> terminatedQueries);
}

void forEach(final ForEach action) {
toRestore.forEach((commandId, command) -> {
final int commandIdIdx = allCommandIds.indexOf(commandId);
final Map<QueryId, CommandId> terminatedAfter = new HashMap<>();
allTerminatedQueries.entrySet().stream()
.filter(entry -> allCommandIds.indexOf(entry.getValue()) > commandIdIdx)
.forEach(queryIdCommandIdEntry ->
terminatedAfter.put(queryIdCommandIdEntry.getKey(), queryIdCommandIdEntry.getValue()));
action.apply(commandId, command, terminatedAfter);
});
}

// Visible for testing
Map<QueryId, CommandId> terminatedQueries() {
return Collections.unmodifiableMap(allTerminatedQueries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.ddl.commands.*;
import io.confluent.ksql.ddl.commands.DDLCommandResult;
import io.confluent.ksql.exception.ExceptionUtil;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
Expand All @@ -36,14 +36,12 @@
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -74,19 +72,18 @@ public StatementExecutor(
this.statusFutures = new HashMap<>();
}

void handleStatements(List<Pair<CommandId, Command>> priorCommands) throws Exception {
for (Pair<CommandId, Command> commandIdCommandPair: priorCommands) {
log.info("Executing prior statement: '{}'", commandIdCommandPair.getRight());
void handleRestoration(final RestoreCommands restoreCommands) throws Exception {
restoreCommands.forEach(((commandId, command, terminatedQueries) -> {
log.info("Executing prior statement: '{}'", command);
try {
handleStatementWithTerminatedQueries(
commandIdCommandPair.getRight(),
commandIdCommandPair.getLeft(),
Collections.emptyMap()
);
command,
commandId,
terminatedQueries);
} catch (Exception exception) {
log.warn("Failed to execute statement due to exception", exception);
}
}
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public void shouldFetchAndRunNewCommandsFromCommandTopic() throws Exception {
@Test
public void shouldFetchAndRunPriorCommandsFromCommandTopic() throws Exception {
StatementExecutor statementExecutor = mock(StatementExecutor.class);
statementExecutor.handleStatements(anyObject());
statementExecutor.handleRestoration(anyObject());
expectLastCall();
replay(statementExecutor);
CommandStore commandStore = mock(CommandStore.class);
expect(commandStore.getPriorCommands()).andReturn(Collections.emptyList());
expect(commandStore.getRestoreCommands()).andReturn(new RestoreCommands());
replay(commandStore);
CommandRunner commandRunner = new CommandRunner(statementExecutor, commandStore);
commandRunner.processPriorCommands();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.Pair;

import static org.easymock.EasyMock.anyLong;
Expand Down Expand Up @@ -69,8 +72,8 @@ public void shouldUseFirstCommandForSameIdIfNoDropBetweenThem() {
EasyMock.replay(commandConsumer);

final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final List<Pair<CommandId, Command>> priorCommands = command.getPriorCommands();
assertThat(priorCommands, equalTo(Collections.singletonList(new Pair<>(commandId, originalCommand))));
final Map<CommandId, Command> commands = getPriorCommands(command);
assertThat(commands, equalTo(Collections.singletonMap(commandId, originalCommand)));
}

@Test
Expand All @@ -94,8 +97,8 @@ public void shouldReplaceCommandWithNewCommandAfterDrop() {
EasyMock.replay(commandConsumer);

final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final List<Pair<CommandId, Command>> priorCommands = command.getPriorCommands();
assertThat(priorCommands, equalTo(Collections.singletonList(new Pair<>(createId, latestCommand))));
final Map<CommandId, Command> commands = getPriorCommands(command);
assertThat(commands, equalTo(Collections.singletonMap(createId, latestCommand)));
}

@Test
Expand All @@ -116,9 +119,33 @@ public void shouldRemoveCreateCommandIfItHasBeenDropped() {
.andReturn(new ConsumerRecords<>(Collections.emptyMap()));
EasyMock.replay(commandConsumer);

final CommandStore command = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final List<Pair<CommandId, Command>> priorCommands = command.getPriorCommands();
assertThat(priorCommands, equalTo(Collections.emptyList()));
final CommandStore commandStore = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
assertThat(getPriorCommands(commandStore), equalTo(Collections.emptyMap()));
}

@Test
public void shouldCollectTerminatedQueries() {
final CommandId terminated = new CommandId(CommandId.Type.TERMINATE, "queryId", CommandId.Action.EXECUTE);
final ConsumerRecords<CommandId, Command> records = new ConsumerRecords<>(
Collections.singletonMap(new TopicPartition("topic", 0), Collections.singletonList(
new ConsumerRecord<>("topic", 0, 0, terminated, new Command("terminate query 'queryId'", Collections.emptyMap()))
)));

EasyMock.expect(commandConsumer.partitionsFor(COMMAND_TOPIC)).andReturn(Collections.emptyList());
EasyMock.expect(commandConsumer.poll(anyLong())).andReturn(records)
.andReturn(new ConsumerRecords<>(Collections.emptyMap()));
EasyMock.replay(commandConsumer);

final CommandStore commandStore = new CommandStore(COMMAND_TOPIC, commandConsumer, commandProducer, new CommandIdAssigner(new MetaStoreImpl()));
final RestoreCommands restoreCommands = commandStore.getRestoreCommands();
assertThat(restoreCommands.terminatedQueries(), equalTo(Collections.singletonMap(new QueryId("queryId"), terminated)));
}

private Map<CommandId, Command> getPriorCommands(CommandStore command) {
final RestoreCommands priorCommands = command.getRestoreCommands();
final Map<CommandId, Command> commands = new HashMap<>();
priorCommands.forEach(((id, cmd, terminatedQueries) -> commands.put(id, cmd)));
return commands;
}

}
Loading

0 comments on commit 298fdde

Please sign in to comment.