Skip to content

Commit

Permalink
[Reactive] Workaround for an RS 1.0.3 TCK bug (#1568)
Browse files Browse the repository at this point in the history
* [Reactive] Workaround for an RS 1.0.3 TCK bug

* Rename the static field

* Use execute()

* Fix compile errors
  • Loading branch information
akarnokd authored Mar 24, 2020
1 parent 76213f9 commit 3099935
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
Expand Down Expand Up @@ -526,18 +527,32 @@ static <T, R> Flow.Processor<T, R> coupledBuildProcessor(Flow.Subscriber<? super
}

static void complete(CompletableFuture<Object> cf) {
ForkJoinPool.commonPool().submit(() -> {
coupledExecutor.execute(() -> {
cf.complete(null);
return null;
});
}

static void fail(CompletableFuture<Object> cf, Throwable ex) {
ForkJoinPool.commonPool().submit(() -> {
coupledExecutor.execute(() -> {
cf.completeExceptionally(ex);
return null;
});
}

// Workaround for a TCK bug when calling cancel() from any method named onComplete().
private static volatile ExecutorService coupledExecutor = ForkJoinPool.commonPool();

/**
* Override the ExecutorService used by the cross-termination and cross-cancellation
* of a Coupled stage.
* @param executor the executor to use, null resets it to the default ForkJoinPool
*/
public static void setCoupledExecutor(ExecutorService executor) {
if (executor == null) {
coupledExecutor = ForkJoinPool.commonPool();
} else {
coupledExecutor = executor;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import org.eclipse.microprofile.reactive.streams.operators.tck.ReactiveStreamsTck;
import org.reactivestreams.tck.TestEnvironment;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;

public class HelidonReactiveStreamsEngineTckTest extends ReactiveStreamsTck<HelidonReactiveStreamsEngine> {

public HelidonReactiveStreamsEngineTckTest() {
Expand All @@ -41,4 +47,19 @@ protected ReactiveStreamsFactory createFactory() {
protected boolean isEnabled(Object test) {
return true;
}

private ExecutorService executor;

@BeforeSuite(alwaysRun = true)
public void before() {
executor = Executors.newSingleThreadExecutor();
HelidonReactiveStreamsEngine.setCoupledExecutor(executor);
}

@AfterSuite(alwaysRun = true)
public void after() {
HelidonReactiveStreamsEngine.setCoupledExecutor(null);
executor.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
Expand All @@ -32,6 +33,7 @@
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.ReactiveStreamsSpiVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Factory;

public class HelidonReactiveStreamsTckTest extends ReactiveStreamsTck<HelidonReactiveStreamsEngine> {
Expand All @@ -45,4 +47,18 @@ protected HelidonReactiveStreamsEngine createEngine() {
return new HelidonReactiveStreamsEngine();
}

private ExecutorService executor;

@BeforeSuite(alwaysRun = true)
public void before() {
executor = Executors.newSingleThreadExecutor();
HelidonReactiveStreamsEngine.setCoupledExecutor(executor);
}

@AfterSuite(alwaysRun = true)
public void after() {
HelidonReactiveStreamsEngine.setCoupledExecutor(null);
executor.shutdown();
}

}

0 comments on commit 3099935

Please sign in to comment.