Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reactive] Reimplement Microprofile-RS, fix operators, add from(CS) #1511

Merged
merged 16 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
Expand Down Expand Up @@ -355,6 +356,34 @@ default Single<T> first() {
return new MultiFirstPublisher<>(this);
}

/**
* Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
* <p>
* A null result from the CompletionStage will yield a
* {@link NullPointerException} signal.
* </p>
* @param completionStage the CompletionStage to
* @param <T> the element type of the stage and result
* @return Multi
* @see #from(CompletionStage, boolean)
*/
static <T> Multi<T> from(CompletionStage<T> completionStage) {
return from(completionStage, false);
}

/**
* Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
* @param completionStage the CompletionStage to
* @param nullMeansEmpty if true, a null result is interpreted to be an empty sequence
* if false, the resulting sequence fails with {@link NullPointerException}
* @param <T> the element type of the stage and result
* @return Multi
*/
static <T> Multi<T> from(CompletionStage<T> completionStage, boolean nullMeansEmpty) {
Objects.requireNonNull(completionStage, "completionStage is null");
return new MultiFromCompletionStage<>(completionStage, nullMeansEmpty);
}

/**
* Create a {@link Multi} instance wrapped around the given publisher.
*
Expand Down Expand Up @@ -545,6 +574,22 @@ default Multi<T> onError(Consumer<Throwable> onErrorConsumer) {
null);
}

/**
* Executes given {@link java.lang.Runnable} when a cancel signal is received.
*
* @param onCancel {@link java.lang.Runnable} to be executed.
* @return Multi
*/
default Multi<T> onCancel(Runnable onCancel) {
return new MultiTappedPublisher<>(this,
null,
null,
null,
null,
null,
onCancel);
}

/**
* Relay upstream items until the other source signals an item or completes.
* @param other the other sequence to signal the end of the main sequence
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

/**
* Signal the outcome of the give CompletionStage.
* @param <T> the element type of the source and result
*/
final class MultiFromCompletionStage<T> implements Multi<T> {

private final CompletionStage<T> source;

private final boolean nullMeansEmpty;

MultiFromCompletionStage(CompletionStage<T> source, boolean nullMeansEmpty) {
this.source = source;
this.nullMeansEmpty = nullMeansEmpty;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
subscribe(subscriber, source, nullMeansEmpty);
}

static <T> void subscribe(Flow.Subscriber<? super T> subscriber, CompletionStage<T> source, boolean nullMeansEmpty) {
AtomicBiConsumer<T> watcher = new AtomicBiConsumer<>();
CompletionStageSubscription<T> css = new CompletionStageSubscription<>(subscriber, nullMeansEmpty, watcher);
watcher.lazySet(css);

subscriber.onSubscribe(css);
source.whenComplete(watcher);
}

static final class CompletionStageSubscription<T> extends DeferredScalarSubscription<T> implements BiConsumer<T, Throwable> {

private final boolean nullMeansEmpty;

private final AtomicBiConsumer<T> watcher;

CompletionStageSubscription(Flow.Subscriber<? super T> downstream, boolean nullMeansEmpty, AtomicBiConsumer<T> watcher) {
super(downstream);
this.nullMeansEmpty = nullMeansEmpty;
this.watcher = watcher;
}

@Override
public void accept(T t, Throwable throwable) {
if (throwable != null) {
error(throwable);
} else if (t != null) {
complete(t);
} else if (nullMeansEmpty) {
complete();
} else {
error(new NullPointerException("The CompletionStage completed with a null value"));
}
}

@Override
public void cancel() {
super.cancel();
watcher.getAndSet(null);
}
}

static final class AtomicBiConsumer<T> extends AtomicReference<BiConsumer<T, Throwable>>
implements BiConsumer<T, Throwable> {

@Override
public void accept(T t, Throwable throwable) {
BiConsumer<T, Throwable> bc = getAndSet(null);
if (bc != null) {
bc.accept(t, throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ public Multi<T> peek(Consumer<T> consumer) {
);
}

@Override
public Multi<T> onCancel(Runnable onCancel) {
return new MultiTappedPublisher<>(
source,
onSubscribeCallback,
onNextCallback,
onErrorCallback,
onCompleteCallback,
onRequestCallback,
RunnableChain.combine(onCancelCallback, onCancel)
);
}

static final class MultiTappedSubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;
Expand Down Expand Up @@ -163,7 +176,9 @@ public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription, "subscription is null");
if (upstream != null) {
subscription.cancel();
throw new IllegalStateException("Subscription already set!");
// FIXME Microprofile RS doesn't like if this throws
// throw new IllegalStateException("Subscription already set!");
return;
}
upstream = subscription;
if (onSubscribeCallback != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,34 @@ static Single<Long> timer(long time, TimeUnit unit, ScheduledExecutorService exe
return new SingleTimer(time, unit, executor);
}

/**
* Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
* <p>
* A null result from the CompletionStage will yield a
* {@link NullPointerException} signal.
* </p>
* @param completionStage the CompletionStage to
* @param <T> the element type of the stage and result
* @return Single
* @see #from(CompletionStage, boolean)
*/
static <T> Single<T> from(CompletionStage<T> completionStage) {
return from(completionStage, false);
}

/**
* Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.
* @param completionStage the CompletionStage to
* @param nullMeansEmpty if true, a null result is interpreted to be an empty sequence
* if false, the resulting sequence fails with {@link NullPointerException}
* @param <T> the element type of the stage and result
* @return Single
*/
static <T> Single<T> from(CompletionStage<T> completionStage, boolean nullMeansEmpty) {
Objects.requireNonNull(completionStage, "completionStage is null");
return new SingleFromCompletionStage<>(completionStage, nullMeansEmpty);
}

/**
* Signals a {@link TimeoutException} if the upstream doesn't signal an item, error
* or completion within the specified time.
Expand All @@ -311,7 +339,6 @@ default Single<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService
return new SingleTimeout<>(this, timeout, unit, executor, null);
}


/**
* Switches to a fallback single if the upstream doesn't signal an item, error
* or completion within the specified time.
Expand Down Expand Up @@ -361,7 +388,7 @@ default Single<T> onTerminate(Runnable onTerminate) {
/**
* Executes given {@link java.lang.Runnable} when onComplete signal is received.
*
* @param onComplete {@link java.lang.Runnable} to be executed.
* @param onComplete {@link java.lang.Runnable} to be executed.
* @return Single
*/
default Single<T> onComplete(Runnable onComplete) {
Expand Down Expand Up @@ -390,6 +417,22 @@ default Single<T> onError(Consumer<Throwable> onErrorConsumer) {
null);
}

/**
* Executes given {@link java.lang.Runnable} when a cancel signal is received.
*
* @param onCancel {@link java.lang.Runnable} to be executed.
* @return Single
*/
default Single<T> onCancel(Runnable onCancel) {
return new SingleTappedPublisher<>(this,
null,
null,
null,
null,
null,
onCancel);
}

/**
* Invoke provided consumer for the item in stream.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;

/**
* Signal the outcome of the give CompletionStage.
* @param <T> the element type of the source and result
*/
final class SingleFromCompletionStage<T> implements Single<T> {

private final CompletionStage<T> source;

private final boolean nullMeansEmpty;

SingleFromCompletionStage(CompletionStage<T> source, boolean nullMeansEmpty) {
this.source = source;
this.nullMeansEmpty = nullMeansEmpty;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
MultiFromCompletionStage.subscribe(subscriber, source, nullMeansEmpty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,17 @@ public Single<T> peek(Consumer<T> consumer) {
onCancelCallback
);
}

@Override
public Single<T> onCancel(Runnable onCancel) {
return new SingleTappedPublisher<>(
source,
onSubscribeCallback,
onNextCallback,
onErrorCallback,
onCompleteCallback,
onRequestCallback,
RunnableChain.combine(onCancelCallback, onCancel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,20 @@ public void onSubscribe(Flow.Subscription subscription) {
assertThat(ts.getLastError(), is(nullValue()));
assertThat(ts.isComplete(), is(true));
}

@Test
public void flatMapCompletionStage() {
TestSubscriber<Integer> ts = new TestSubscriber<>();

Multi.just(CompletableFuture.completedFuture(1), CompletableFuture.completedFuture(2))
.flatMap(Multi::from, 1, false, 1)
.subscribe(ts);

ts.assertEmpty();

ts.request(1)
.assertValuesOnly(1)
.request(1)
.assertResult(1, 2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.common.reactive;

import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

@Test
public class MultiFromCompletionStageTckTest extends FlowPublisherVerification<Long> {

public MultiFromCompletionStageTckTest() {
super(new TestEnvironment(50));
}

@Override
public Flow.Publisher<Long> createFlowPublisher(long l) {
CompletableFuture<Long> cf = CompletableFuture.completedFuture(1L);
return Multi.from(cf);
}

@Override
public Flow.Publisher<Long> createFailedFlowPublisher() {
CompletableFuture<Long> cf = new CompletableFuture<>();
cf.completeExceptionally(new IOException());
return Multi.from(cf);
}

@Override
public long maxElementsFromPublisher() {
return 1;
}
}
Loading