Skip to content

Commit

Permalink
+ added blocking shutdown via AutoCloseable
Browse files Browse the repository at this point in the history
  • Loading branch information
q3769 committed Sep 24, 2023
1 parent fef7bde commit 24179d5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.q3769</groupId>
<artifactId>conseq4j</artifactId>
<version>20230922.0.20230924</version>
<version>20230922.20230924.0</version>
<packaging>jar</packaging>
<name>conseq4j</name>
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/conseq4j/execute/ConseqExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*/
@ThreadSafe
@ToString
public final class ConseqExecutor implements SequentialExecutor {
public final class ConseqExecutor implements SequentialExecutor, AutoCloseable {

private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap<>();
private final ExecutorService adminService = Executors.newSingleThreadExecutor();
Expand All @@ -52,7 +52,6 @@ public final class ConseqExecutor implements SequentialExecutor {
* max parallelism of task execution.
*/
private final ExecutorService workerExecutorService;
private final ConditionFactory await = Awaitility.await().forever();

private ConseqExecutor(ExecutorService workerExecutorService) {
this.workerExecutorService = workerExecutorService;
Expand Down Expand Up @@ -83,6 +82,10 @@ private ConseqExecutor(ExecutorService workerExecutorService) {
return new ConseqExecutor(workerExecutorService);
}

private static ConditionFactory await() {
return Awaitility.await().forever();
}

private static <T> T call(Callable<T> task) {
try {
return task.call();
Expand Down Expand Up @@ -156,7 +159,7 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
public void shutdown() {
new Thread(() -> {
workerExecutorService.shutdown();
await.until(activeSequentialTasks::isEmpty);
await().until(activeSequentialTasks::isEmpty);
adminService.shutdown();
}).start();
}
Expand All @@ -173,6 +176,14 @@ public boolean isTerminated() {
return neverStartedTasks;
}

@Override
public void close() {
workerExecutorService.shutdown();
await().until(activeSequentialTasks::isEmpty);
adminService.shutdown();
await().until(this::isTerminated);
}

int estimateActiveExecutorCount() {
return activeSequentialTasks.size();
}
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/conseq4j/summon/ConseqServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Delegate;
import org.awaitility.Awaitility;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -49,7 +50,7 @@

@ThreadSafe
@ToString
public final class ConseqServiceFactory implements SequentialExecutorServiceFactory {
public final class ConseqServiceFactory implements SequentialExecutorServiceFactory, AutoCloseable {
private final int concurrency;
private final ConcurrentMap<Object, ShutdownDisabledExecutorService> sequentialExecutors;

Expand Down Expand Up @@ -110,6 +111,12 @@ public List<Runnable> shutdownNow() {
.collect(Collectors.toList());
}

@Override
public void close() {
this.shutdown();
Awaitility.await().forever().until(this::isTerminated);
}

private int bucketOf(Object sequenceKey) {
return floorMod(Objects.hash(sequenceKey), this.concurrency);
}
Expand Down

0 comments on commit 24179d5

Please sign in to comment.