Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Multi.flatMapIterable + TCK test #1467

Merged
merged 1 commit into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,23 @@ default <U> Multi<U> flatMap(Function<T, Flow.Publisher<U>> mapper, long maxConc
* @param <U> output item type
* @return Multi
*/
default <U> Multi<U> flatMapIterable(Function<T, Iterable<U>> iterableMapper) {
MultiFlatMapProcessor<T, U> processor = MultiFlatMapProcessor.fromIterableMapper(iterableMapper);
this.subscribe(processor);
return processor;
default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> iterableMapper) {
return flatMapIterable(iterableMapper, 32);
}

/**
* Transform item with supplied function and flatten resulting {@link Iterable} to downstream.
*
* @param iterableMapper {@link Function} receiving item as parameter and returning {@link Iterable}
* @param prefetch the number of upstream items to request upfront, then 75% of this value after
* 75% received and mapped
* @param <U> output item type
* @return Multi
*/
default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> iterableMapper,
int prefetch) {
Objects.requireNonNull(iterableMapper, "iterableMapper is null");
return new MultiFlatMapIterable<>(this, iterableMapper, prefetch);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
* Map each upstream item into an Iterable and stream their values.
* @param <T> the upstream item type
* @param <R> the output item type
*/
final class MultiFlatMapIterable<T, R> implements Multi<R> {

private final Multi<T> source;

private final Function<? super T, ? extends Iterable<? extends R>> mapper;

private final int prefetch;

MultiFlatMapIterable(Multi<T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch) {
this.source = source;
this.mapper = mapper;
this.prefetch = prefetch;
}

@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
source.subscribe(new FlatMapIterableSubscriber<>(subscriber, mapper, prefetch));
}

static final class FlatMapIterableSubscriber<T, R>
extends AtomicInteger
implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super R> downstream;

private final Function<? super T, ? extends Iterable<? extends R>> mapper;

private final int prefetch;

private final AtomicLong requested;

private final ConcurrentLinkedQueue<T> queue;

private Flow.Subscription upstream;

private long emitted;

private volatile boolean upstreamDone;
private Throwable error;

private volatile boolean canceled;

private Iterator<? extends R> currentIterator;

private int upstreamConsumed;

FlatMapIterableSubscriber(Flow.Subscriber<? super R> downstream,
Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch) {
this.downstream = downstream;
this.mapper = mapper;
this.prefetch = prefetch;
this.requested = new AtomicLong();
this.queue = new ConcurrentLinkedQueue<>();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
subscription.request(prefetch);
}

@Override
public void onNext(T item) {
queue.offer(item);
drain();
}

@Override
public void onError(Throwable throwable) {
error = throwable;
upstreamDone = true;
drain();
}

@Override
public void onComplete() {
upstreamDone = true;
drain();
}

@Override
public void request(long n) {
if (n <= 0L) {
onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden!"));
} else {
SubscriptionHelper.addRequest(requested, n);
drain();
}
}

@Override
public void cancel() {
canceled = true;
upstream.cancel();
drain();
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

Iterator<? extends R> iterator = currentIterator;
Flow.Subscriber<? super R> downstream = this.downstream;
long e = emitted;
int limit = prefetch - (prefetch >> 2);

int missed = 1;
outer:
for (;;) {

if (canceled) {
iterator = null;
currentIterator = null;
queue.clear();
} else {
if (upstreamDone) {
Throwable ex = error;
if (ex != null) {
canceled = true;
downstream.onError(ex);
continue;
}
}
if (iterator == null) {
boolean d = upstreamDone;
T item = queue.poll();
boolean empty = item == null;

if (d && empty) {
canceled = true;
downstream.onComplete();
continue;
}

if (!empty) {

int c = upstreamConsumed + 1;
if (c == limit) {
upstreamConsumed = 0;
upstream.request(limit);
} else {
upstreamConsumed = c;
}

boolean hasNext;
try {
iterator = Objects.requireNonNull(
mapper.apply(item).iterator(),
"The Iterable returned a null iterator"
);

if (canceled) {
continue;
}

hasNext = iterator.hasNext();
} catch (Throwable ex) {
canceled = true;
downstream.onError(ex);
continue;
}

if (!hasNext) {
iterator = null;
continue;
}
currentIterator = iterator;
}
}

if (iterator != null) {
long r = requested.get();

while (e != r) {

if (canceled) {
continue outer;
}

R result;

try {
result = Objects.requireNonNull(iterator.next(),
"The iterator returned a null item");
} catch (Throwable ex) {
canceled = true;
downstream.onError(ex);
continue outer;
}

if (canceled) {
continue outer;
}

downstream.onNext(result);
e++;

if (canceled) {
continue outer;
}

boolean hasNext;
try {
hasNext = iterator.hasNext();
} catch (Throwable ex) {
canceled = true;
downstream.onError(ex);
continue outer;
}

if (canceled) {
continue outer;
}

if (!hasNext) {
iterator = null;
currentIterator = null;
continue outer;
}
}
}
}

emitted = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;

import java.util.Collections;
import java.util.concurrent.Flow;
import java.util.stream.IntStream;

public class MultiFlatMapIterableManyToManyTckTest extends FlowPublisherVerification<Integer> {

public MultiFlatMapIterableManyToManyTckTest() {
super(new TestEnvironment(50));
}

@Override
public Flow.Publisher<Integer> createFlowPublisher(long l) {
return Multi.just(l >> 1, l - (l >> 1))
.flatMapIterable(v -> () -> IntStream.range(0, v.intValue()).boxed().iterator());
}

@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return 10;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;

import java.util.Collections;
import java.util.concurrent.Flow;
import java.util.stream.IntStream;

public class MultiFlatMapIterableManyToOneTckTest extends FlowPublisherVerification<Integer> {

public MultiFlatMapIterableManyToOneTckTest() {
super(new TestEnvironment(50));
}

@Override
public Flow.Publisher<Integer> createFlowPublisher(long l) {
return Multi.from(() -> IntStream.range(0, (int)l).boxed().iterator())
.flatMapIterable(Collections::singleton);
}

@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return 10;
}
}
Loading