Skip to content

Commit

Permalink
Reimplement ConcatPublisher + TCK tests (#1452)
Browse files Browse the repository at this point in the history
* Reimplement ConcatPublisher + TCK tests
* Spotbugs doesn't like extending the Atomics
* GraphBuilder should not be serialized just because it extends some Serialized
  • Loading branch information
akarnokd authored Mar 2, 2020
1 parent 5005bd7 commit 000d470
Show file tree
Hide file tree
Showing 5 changed files with 444 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,157 +17,197 @@

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Concat streams to one.
*
* @param <T> item type
*/
public class ConcatPublisher<T> implements Flow.Publisher<T>, Multi<T> {
private FirstSubscriber firstSubscriber;
private SecondSubscriber secondSubscriber;
private Flow.Subscriber<T> subscriber;
private Flow.Publisher<T> firstPublisher;
private Flow.Publisher<T> secondPublisher;
private RequestedCounter requested = new RequestedCounter(true);
private ReentrantLock firstPublisherCompleteLock = new ReentrantLock();
private CompletableFuture<Void> firstSubscriberCompleted = new CompletableFuture<>();
private CompletableFuture<Flow.Subscriber<T>> downstreamReady = new CompletableFuture<>();
public final class ConcatPublisher<T> implements Flow.Publisher<T>, Multi<T> {
private final Flow.Publisher<T> firstPublisher;
private final Flow.Publisher<T> secondPublisher;

private ConcatPublisher(Flow.Publisher<T> firstPublisher, Flow.Publisher<T> secondPublisher) {
this.firstPublisher = firstPublisher;
this.secondPublisher = secondPublisher;
}

/**
* Create new {@link ConcatPublisher}.
* Create new {@code ConcatPublisher}.
*
* @param firstPublisher first stream
* @param secondPublisher second stream
* @param <T> item type
* @return {@link ConcatPublisher}
* @return {@code ConcatPublisher}
*/
public static <T> ConcatPublisher<T> create(Flow.Publisher<T> firstPublisher, Flow.Publisher<T> secondPublisher) {
return new ConcatPublisher<>(firstPublisher, secondPublisher);
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(Flow.Subscriber<? super T> subscriber) {
this.subscriber = (Flow.Subscriber<T>) subscriber;
ConcatCancelingSubscription<T> parent = new ConcatCancelingSubscription<>(subscriber, firstPublisher, secondPublisher);
subscriber.onSubscribe(parent);
parent.drain();
}

this.firstSubscriber = new FirstSubscriber();
this.secondSubscriber = new SecondSubscriber();
static final class ConcatCancelingSubscription<T>
extends AtomicInteger implements Flow.Subscription {

firstPublisher.subscribe(firstSubscriber);
private static final long serialVersionUID = -1593224722447706944L;

subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
if (!StreamValidationUtils.checkRequestParam(n, subscriber::onError)) {
return;
}
requested.increment(n, subscriber::onError);
firstCompleteLock(() -> {
if (!firstSubscriber.complete) {
firstSubscriber.subscription.request(n);
} else {
secondSubscriber.subscription.request(n);
}
});
}
private final InnerSubscriber<T> inner1;

@Override
public void cancel() {
firstSubscriber.subscription.cancel();
secondSubscriber.subscription.cancel();
}
});
downstreamReady.complete(this.subscriber);
}
private final InnerSubscriber<T> inner2;

private class FirstSubscriber implements Flow.Subscriber<T> {
private final AtomicBoolean canceled;

private Flow.Subscription subscription;
private boolean complete = false;
private Flow.Publisher<T> source1;

@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
this.subscription = subscription;
secondPublisher.subscribe(secondSubscriber);
}
private Flow.Publisher<T> source2;

@Override
public void onNext(T o) {
requested.tryDecrement();
ConcatPublisher.this.subscriber.onNext(o);
private int index;

ConcatCancelingSubscription(Flow.Subscriber<? super T> subscriber,
Flow.Publisher<T> source1, Flow.Publisher<T> source2) {
this.inner1 = new InnerSubscriber<>(subscriber, this);
this.inner2 = new InnerSubscriber<>(subscriber, this);
this.canceled = new AtomicBoolean();
this.source1 = source1;
this.source2 = source2;
}

@Override
public void onError(Throwable t) {
firstCompleteLock(() -> complete = true);
secondSubscriber.subscription.cancel();
subscription.cancel();
ConcatPublisher.this.subscriber.onError(t);
public void request(long n) {
SubscriptionHelper.deferredRequest(inner2, inner2.requested, n);
SubscriptionHelper.deferredRequest(inner1, inner1.requested, n);
}

@Override
public void onComplete() {
firstSubscriberCompleted.complete(null);
firstCompleteLock(() -> complete = true);
try {
requested.lock();
long n = requested.get();
if (n > 0) {
secondSubscriber.subscription.request(n);
}
} finally {
requested.unlock();
public void cancel() {
if (canceled.compareAndSet(false, true)) {
SubscriptionHelper.cancel(inner1);
SubscriptionHelper.cancel(inner2);
drain();
}
}
}

private class SecondSubscriber implements Flow.Subscriber<T> {
void drain() {
if (getAndIncrement() != 0) {
return;
}

private Flow.Subscription subscription;
int missed = 1;
for (;;) {

if (index == 0) {
index = 1;
Flow.Publisher<T> source = source1;
source1 = null;
source.subscribe(inner1);
} else if (index == 1) {
index = 2;
Flow.Publisher<T> source = source2;
source2 = null;
if (inner1.produced != 0L) {
SubscriptionHelper.produced(inner2.requested, inner1.produced);
}
source.subscribe(inner2);
} else if (index == 2) {
index = 3;
if (!canceled.get()) {
inner1.downstream.onComplete();
}
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
this.subscription = subscription;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

@Override
public void onNext(T o) {
ConcatPublisher.this.subscriber.onNext(o);
private void writeObject(ObjectOutputStream stream)
throws IOException {
stream.defaultWriteObject();
}

@Override
public void onError(Throwable t) {
firstSubscriber.subscription.cancel();
subscription.cancel();
ConcatPublisher.this.subscriber.onError(t);
private void readObject(ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
}

@Override
public void onComplete() {
downstreamReady.whenComplete((downStreamSubscriber, th) -> {
firstSubscriberCompleted.whenComplete((aVoid, t) -> ConcatPublisher.this.subscriber.onComplete());
});
}
}
static final class InnerSubscriber<T> extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T> {

private static final long serialVersionUID = 3029954591185720794L;

private final Flow.Subscriber<? super T> downstream;

private final ConcatCancelingSubscription<T> parent;

private final AtomicLong requested;

private long produced;

InnerSubscriber(Flow.Subscriber<? super T> downstream, ConcatCancelingSubscription<T> parent) {
this.downstream = downstream;
this.parent = parent;
this.requested = new AtomicLong();
}

@Override
public void onSubscribe(Flow.Subscription s) {
SubscriptionHelper.deferredSetOnce(this, requested, s);
}

@Override
public void onNext(T t) {
if (get() != SubscriptionHelper.CANCELED) {
produced++;
downstream.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (get() != SubscriptionHelper.CANCELED) {
lazySet(SubscriptionHelper.CANCELED);
downstream.onError(t);

parent.cancel();
} else {
// FIXME
// HelidonReactivePlugins.onError(t);
}
}

@Override
public void onComplete() {
if (get() != SubscriptionHelper.CANCELED) {
lazySet(SubscriptionHelper.CANCELED);
parent.drain();
}
}

private void writeObject(ObjectOutputStream stream)
throws IOException {
stream.defaultWriteObject();
}

private void readObject(ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
}

private void firstCompleteLock(Runnable runnable) {
try {
firstPublisherCompleteLock.lock();
runnable.run();
} finally {
firstPublisherCompleteLock.unlock();
}
}
}
Loading

0 comments on commit 000d470

Please sign in to comment.