Skip to content

Commit

Permalink
+ auto close now first waits until no task pending before shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
q3769 committed Nov 4, 2023
1 parent 9ac90ef commit 8cdbb49
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/main/java/conseq4j/Terminable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import java.util.List;

/**
* Direct shutdown operations
*/
public interface Terminable {
/**
* Initiates an orderly terminate of all managed thread resources. Previously submitted tasks are executed, but no
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/conseq4j/execute/ConseqExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
return (CompletableFuture<T>) copy;
}

/**
* First wait until no more task pending. For direct shutdown operations, use {@link Terminable} methods.
*/
@Override
public void close() {
awaitForever().until(this::noTaskPending);
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/conseq4j/summon/ConseqServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.awaitility.Awaitility.await;

import conseq4j.Terminable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand All @@ -37,6 +38,7 @@
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Delegate;
import org.awaitility.core.ConditionFactory;

/**
* A factory to produce sequential executors of type {@link ExecutorService} with an upper-bound global execution
Expand Down Expand Up @@ -81,6 +83,10 @@ private ConseqServiceFactory(int concurrency) {
return new ConseqServiceFactory(concurrency);
}

private static ConditionFactory awaitForever() {
return await().forever().pollDelay(Duration.ofMillis(10));
}

/**
* @return a single-thread executor that does not support any shutdown action.
*/
Expand All @@ -91,11 +97,14 @@ public ExecutorService getExecutorService(@NonNull Object sequenceKey) {
bucket -> new ShutdownDisabledExecutorService(Executors.newSingleThreadExecutor()));
}

/**
* Shuts down all executors and awaits termination to complete
*/
@Override
public void close() {
Collection<ShutdownDisabledExecutorService> shutdownDisabledExecutorServices = sequentialExecutors.values();
shutdownDisabledExecutorServices.forEach(ShutdownDisabledExecutorService::closeDelegate);
await().forever().until(() -> shutdownDisabledExecutorServices.stream()
awaitForever().until(() -> shutdownDisabledExecutorServices.stream()
.allMatch(ShutdownDisabledExecutorService::isTerminated));
}

Expand Down

0 comments on commit 8cdbb49

Please sign in to comment.