Skip to content

Commit

Permalink
exec shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jul 27, 2024
1 parent 423b4a8 commit e2d4c51
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -297,4 +298,20 @@ private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String bran
}
return ImmutableMap.of();
}

@Override
public void stop() {
exec.shutdownNow();

// ensure coordinator tasks are shut down, else cause the sink worker to fail
try {
if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Timed out waiting for coordinator shutdown");
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for coordinator shutdown", e);
}

super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/
package io.tabular.iceberg.connect.channel;

import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatorThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThread.class);
private static final String THREAD_NAME = "iceberg-coord";

private Coordinator coordinator;
private volatile Coordinator coordinator;
private volatile boolean terminated;

public CoordinatorThread(Coordinator coordinator) {
Expand Down Expand Up @@ -53,10 +54,10 @@ public void run() {

try {
coordinator.stop();
coordinator = null;
} catch (Exception e) {
LOG.error("Coordinator error during stop, ignoring", e);
}
coordinator = null;
}

public boolean isTerminated() {
Expand All @@ -65,5 +66,15 @@ public boolean isTerminated() {

public void terminate() {
terminated = true;

try {
join();
} catch (InterruptedException e) {
throw new ConnectException(e);
}

if (coordinator != null) {
throw new ConnectException("Coordinator was not stopped during thread termination");
}
}
}

0 comments on commit e2d4c51

Please sign in to comment.