Skip to content

Commit

Permalink
exec shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jul 27, 2024
1 parent 423b4a8 commit 1422078
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -297,4 +298,20 @@ private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String bran
}
return ImmutableMap.of();
}

@Override
public void stop() {
exec.shutdownNow();

// ensure coordinator tasks are shut down, else cause the sink worker to fail
try {
if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Timed out waiting for coordinator shutdown");
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for coordinator shutdown", e);
}

super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.tabular.iceberg.connect.channel;

import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,5 +66,10 @@ public boolean isTerminated() {

public void terminate() {
terminated = true;
try {
join();
} catch (InterruptedException e) {
throw new ConnectException(e);
}
}
}

0 comments on commit 1422078

Please sign in to comment.