Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single add forSingle and exceptionallyAccept #2121

Merged
merged 5 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ public CompletionAwaitable<T> exceptionally(final Function<Throwable, ? extends
return new CompletionAwaitable<T>(() -> completionStage, this);
}

/**
* {@inheritDoc}
*/
public CompletionAwaitable<T> exceptionallyAccept(final Consumer<Throwable> consumer) {
danielkec marked this conversation as resolved.
Show resolved Hide resolved
return this.handle((item, t) -> {
if (t != null) {
consumer.accept(t);
}
return item;
});
}

@Override
public CompletableFuture<T> toCompletableFuture() {
return originalStage.get().toCompletableFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ default Single<T> onComplete(Runnable onComplete) {
}

/**
* Executes given {@link java.lang.Runnable} when onError signal is received.
* Executes given {@link java.util.function.Consumer} when onError signal is received.
*
* @param onErrorConsumer {@link java.util.function.Consumer} to be executed.
* @return Single
Expand Down Expand Up @@ -659,6 +659,16 @@ default CompletionStage<T> toStage() {
}
}

/**
* Terminal stage, invokes provided consumer when Single is completed.
*
* @param consumer consumer to be invoked
* @return Single completed when the stream terminates
*/
default CompletionAwaitable<Void> forSingle(Consumer<T> consumer) {
return this.thenAccept(consumer);
}

/**
* Cancel upstream.
*
Expand Down Expand Up @@ -793,4 +803,17 @@ CompletionAwaitable<Void> acceptEitherAsync(CompletionStage<? extends T> other,

@Override
CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);

/**
* Returns a new CompletionStage that, when this stage completes
danielkec marked this conversation as resolved.
Show resolved Hide resolved
* exceptionally, is executed with this stage's exception as the
* argument to the supplied consumer. Otherwise, if this stage
* completes normally, then the returned stage also completes
* normally with the same value.
*
* @param consumer the consumer to invoke if this CompletionStage completed
* exceptionally
* @return the new CompletionStage
*/
CompletionAwaitable<T> exceptionallyAccept(Consumer<Throwable> consumer);
}
149 changes: 138 additions & 11 deletions common/reactive/src/test/java/io/helidon/common/reactive/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscriber;
Expand All @@ -25,7 +26,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;

Expand All @@ -37,6 +37,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

/**
Expand Down Expand Up @@ -170,14 +171,14 @@ public void testMapNullMapper() {
try {
Single.just("foo").map(null);
fail("NullPointerException should be thrown");
} catch(NullPointerException ex) {
} catch (NullPointerException ex) {
}
}

@Test
public void testMapBadMapperNullValue() {
SingleTestSubscriber<String> subscriber = new SingleTestSubscriber<>();
Single.just("bar").map((s) -> (String)null).subscribe(subscriber);
Single.just("bar").map((s) -> (String) null).subscribe(subscriber);
assertThat(subscriber.isComplete(), is(equalTo(false)));
assertThat(subscriber.getLastError(), is(instanceOf(NullPointerException.class)));
assertThat(subscriber.getItems(), is(empty()));
Expand Down Expand Up @@ -238,7 +239,7 @@ public void testFromSingle() {
public void testFlatMap() {
SingleTestSubscriber<String> subscriber = new SingleTestSubscriber<>();
Single.just("f.o.o")
.flatMap((str) -> new TestPublisher<>(str.split("\\.")))
.flatMap((str) -> new TestPublisher<>(str.split("\\.")))
.subscribe(subscriber);
assertThat(subscriber.isComplete(), is(equalTo(true)));
assertThat(subscriber.getLastError(), is(nullValue()));
Expand Down Expand Up @@ -283,7 +284,7 @@ public void testFlatMapNullMapper() {
try {
Single.just("bar").flatMap(null);
fail("NullPointerException should have been thrown");
} catch(NullPointerException ex) {
} catch (NullPointerException ex) {
}
}

Expand All @@ -304,17 +305,17 @@ public void subscribe(Subscriber<? super String> subscriber) {
};
try {
single.get(1, TimeUnit.SECONDS);
} catch(ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
} catch (ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
}
}

@Test
public void testEmptyToFuture() throws InterruptedException, TimeoutException {
try {
Single.<Object>empty().get(1, TimeUnit.SECONDS);
} catch(ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
} catch (ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
}
}

Expand All @@ -338,8 +339,8 @@ public void cancel() {
};
try {
single.get(1, TimeUnit.SECONDS);
} catch(ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
} catch (ExecutionException ex) {
assertThat(ex.getCause(), is(instanceOf(IllegalStateException.class)));
}
}

Expand Down Expand Up @@ -489,6 +490,132 @@ void testSingleOnCancel() throws InterruptedException {
onErrCnt.getCount(), is(equalTo(1L)));
}

@Test
void testForSingle() {
AtomicInteger onCancelCnt = new AtomicInteger(0);
AtomicInteger onCompleteCnt = new AtomicInteger(0);
AtomicInteger onErrorCnt = new AtomicInteger(0);
CompletableFuture<String> result = new CompletableFuture<>();

Single.just(TEST_PAYLOAD)
.onCancel(onCancelCnt::incrementAndGet)
.onComplete(onCompleteCnt::incrementAndGet)
.onError(t -> onErrorCnt.incrementAndGet())
.forSingle(result::complete);

assertThat(Single.create(result).await(300, TimeUnit.MILLISECONDS), is(TEST_PAYLOAD));
assertThat(onCancelCnt.get(), is(0));
assertThat(onErrorCnt.get(), is(0));
assertThat(onCompleteCnt.get(), is(1));
}

@Test
void testForSingleErrorAwait() throws InterruptedException {
AtomicInteger onCancelCnt = new AtomicInteger(0);
AtomicInteger onCompleteCnt = new AtomicInteger(0);
CountDownLatch onErrorCnt = new CountDownLatch(1);
CompletableFuture<String> result = new CompletableFuture<>();

RuntimeException testException = new RuntimeException("BOOM!!!");

CompletionException actualException = assertThrows(CompletionException.class,
() -> Single.<String>error(testException)
.onCancel(onCancelCnt::incrementAndGet)
.onComplete(onCompleteCnt::incrementAndGet)
.onError(t -> onErrorCnt.countDown())
.forSingle(result::complete)
.await(300, TimeUnit.MILLISECONDS));

assertThat(onErrorCnt.await(300, TimeUnit.MILLISECONDS), is(true));
assertThat(actualException.getCause(), is(testException));
assertThat(result.isDone(), is(false));
assertThat(onCancelCnt.get(), is(0));
assertThat(onCompleteCnt.get(), is(0));
}

@Test
void testForSingleErrorNoAwait() throws InterruptedException {
AtomicInteger onCancelCnt = new AtomicInteger(0);
AtomicInteger onCompleteCnt = new AtomicInteger(0);
CountDownLatch onErrorCnt = new CountDownLatch(1);
CompletableFuture<String> result = new CompletableFuture<>();

RuntimeException testException = new RuntimeException("BOOM!!!");

Single.<String>error(testException)
.onCancel(onCancelCnt::incrementAndGet)
.onComplete(onCompleteCnt::incrementAndGet)
.onError(t -> onErrorCnt.countDown())
.forSingle(result::complete);

assertThat(onErrorCnt.await(300, TimeUnit.MILLISECONDS), is(true));
assertThat(result.isDone(), is(false));
assertThat(onCancelCnt.get(), is(0));
assertThat(onCompleteCnt.get(), is(0));
}

@Test
void testForSingleException() {
AtomicInteger onCancelCnt = new AtomicInteger(0);
AtomicInteger onCompleteCnt = new AtomicInteger(0);
AtomicInteger onErrorCnt = new AtomicInteger(0);
CompletableFuture<Throwable> result = new CompletableFuture<>();

RuntimeException testException = new RuntimeException("BOOM!!!");

Single.just(TEST_PAYLOAD)
.onCancel(onCancelCnt::incrementAndGet)
.onComplete(onCompleteCnt::incrementAndGet)
.onError(t -> onErrorCnt.incrementAndGet())
.forSingle(s -> {
throw testException;
})
.exceptionallyAccept(result::complete);

assertThat(Single.create(result).await(300, TimeUnit.MILLISECONDS).getCause(), is(testException));
assertThat(onCancelCnt.get(), is(0));
assertThat(onErrorCnt.get(), is(0));
assertThat(onCompleteCnt.get(), is(1));
}

@Test
void testTraditionalExceptionallyWithFunction() {
CompletableFuture<Throwable> exceptionallyFuture = new CompletableFuture<>();

RuntimeException testException = new RuntimeException("BOOM!!!");
Single.error(testException)
.exceptionally(value -> {
exceptionallyFuture.complete(value);
return null;
})
.await(300, TimeUnit.MILLISECONDS);

assertThat(Single.create(exceptionallyFuture).await(300, TimeUnit.MILLISECONDS), is(testException));
}

@Test
void testExceptionallyWithConsumer() {
CompletableFuture<Throwable> exceptionallyFuture = new CompletableFuture<>();

RuntimeException testException = new RuntimeException("BOOM!!!");
Single.error(testException)
.exceptionallyAccept(exceptionallyFuture::complete)
.await(300, TimeUnit.MILLISECONDS);

assertThat(Single.create(exceptionallyFuture).await(300, TimeUnit.MILLISECONDS), is(testException));
}

@Test
void testExceptionallyWithoutException() {
CompletableFuture<Throwable> exceptionallyFuture = new CompletableFuture<>();

String result = Single.just(TEST_PAYLOAD)
.exceptionallyAccept(exceptionallyFuture::complete)
.await(300, TimeUnit.MILLISECONDS);

assertThat(result, is(TEST_PAYLOAD));
}

private static class SingleTestSubscriber<T> extends TestSubscriber<T> {

@Override
Expand Down