diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index c966716a1d..b9b4bc0e87 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -8281,7 +8281,7 @@ public final Flowable flatMap(Function * * @param - * the type of item emitted by the resulting Publisher + * the type of item emitted by the resulting Iterable * @param mapper * a function that returns an Iterable sequence of values for when given an item emitted by the * source Publisher @@ -8310,7 +8310,7 @@ public final Flowable flatMapIterable(final Function * * @param - * the type of item emitted by the resulting Publisher + * the type of item emitted by the resulting Iterable * @param mapper * a function that returns an Iterable sequence of values for when given an item emitted by the * source Publisher @@ -8344,7 +8344,7 @@ public final Flowable flatMapIterable(final Function * the collection element type * @param - * the type of item emitted by the resulting Publisher + * the type of item emitted by the resulting Iterable * @param mapper * a function that returns an Iterable sequence of values for each item emitted by the source * Publisher diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 6dfae5460b..78537d303a 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2506,6 +2506,54 @@ public final Maybe flatMap(Function(this, mapper, resultSelector)); } + /** + * Returns a Flowable that merges each item emitted by the source Maybe with the values in an + * Iterable corresponding to that item that is generated by a selector. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code flattenAsFlowable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Iterable + * @param mapper + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Maybe + * @return the new Flowable instance + * @see ReactiveX operators documentation: FlatMap + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flattenAsFlowable(final Function> mapper) { + return new MaybeFlatMapIterableFlowable(this, mapper); + } + + /** + * Returns an Observable that maps a success value into an Iterable and emits its items. + *

+ * + *

+ *
Scheduler:
+ *
{@code flattenAsObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Iterable + * @param mapper + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Maybe + * @return the new Observable instance + * @see ReactiveX operators documentation: FlatMap + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flattenAsObservable(final Function> mapper) { + return new MaybeFlatMapIterableObservable(this, mapper); + } + /** * Returns an Observable that is based on applying a specified function to the item emitted by the source Maybe, * where that function returns an ObservableSource. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index c5d14e81b2..76a0573663 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7177,7 +7177,7 @@ public final Observable flatMap(Function * * @param - * the type of item emitted by the resulting ObservableSource + * the type of item emitted by the resulting Iterable * @param mapper * a function that returns an Iterable sequence of values for when given an item emitted by the * source ObservableSource @@ -7204,7 +7204,7 @@ public final Observable flatMapIterable(final Function * the collection element type * @param - * the type of item emitted by the resulting ObservableSource + * the type of item emitted by the resulting Iterable * @param mapper * a function that returns an Iterable sequence of values for each item emitted by the source * ObservableSource diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index c20c155103..937aafe511 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1875,7 +1875,56 @@ public final Flowable flatMapPublisher(Function + * + *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code flattenAsFlowable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Iterable + * @param mapper + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Single + * @return the new Flowable instance + * @see ReactiveX operators documentation: FlatMap + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flattenAsFlowable(final Function> mapper) { + return new SingleFlatMapIterableFlowable(this, mapper); + } + + /** + * Returns an Observable that maps a success value into an Iterable and emits its items. + *

+ * + *

+ *
Scheduler:
+ *
{@code flattenAsObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Iterable + * @param mapper + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Single + * @return the new Observable instance + * @see ReactiveX operators documentation: FlatMap + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flattenAsObservable(final Function> mapper) { + return new SingleFlatMapIterableObservable(this, mapper); + } + + /** + * Returns a Single that is based on applying a specified function to the item emitted by the source Single, +>>>>>>> refs/remotes/akarnokd/SoloFlatMapIterable * where that function returns a SingleSource. *

* diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java new file mode 100644 index 0000000000..501830d3d1 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java @@ -0,0 +1,289 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.BackpressureHelper; + +/** + * Maps a success value into an Iterable and streams it back as a Flowable. + * + * @param the source value type + * @param the element type of the Iterable + */ +public final class MaybeFlatMapIterableFlowable extends Flowable { + + final MaybeSource source; + + final Function> mapper; + + public MaybeFlatMapIterableFlowable(MaybeSource source, + Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapIterableObserver(s, mapper)); + } + + static final class FlatMapIterableObserver + extends BasicIntQueueSubscription + implements MaybeObserver { + + private static final long serialVersionUID = -8938804753851907758L; + + final Subscriber actual; + + final Function> mapper; + + final AtomicLong requested; + + Disposable d; + + volatile Iterator it; + + volatile boolean cancelled; + + boolean outputFused; + + FlatMapIterableObserver(Subscriber actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + this.requested = new AtomicLong(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Iterator iter; + boolean has; + try { + iter = mapper.apply(value).iterator(); + + has = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + if (!has) { + actual.onComplete(); + return; + } + + this.it = iter; + drain(); + } + + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + cancelled = true; + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + Subscriber a = actual; + Iterator iter = this.it; + + if (outputFused && iter != null) { + a.onNext(null); + a.onComplete(); + return; + } + + int missed = 1; + + for (;;) { + + if (iter != null) { + long r = requested.get(); + long e = 0L; + + if (r == Long.MAX_VALUE) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + + while (e != r) { + if (cancelled) { + return; + } + + R v; + + try { + v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + e++; + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + + if (iter == null) { + iter = it; + } + } + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public void clear() { + it = null; + } + + @Override + public boolean isEmpty() { + return it == null; + } + + @Override + public R poll() throws Exception { + Iterator iter = it; + + if (iter != null) { + R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + if (iter.hasNext()) { + it = null; + } + return v; + } + return null; + } + + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java new file mode 100644 index 0000000000..d4f08bc0a9 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java @@ -0,0 +1,231 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.Iterator; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.observers.BasicIntQueueDisposable; + +/** + * Maps a success value into an Iterable and streams it back as a Flowable. + * + * @param the source value type + * @param the element type of the Iterable + */ +public final class MaybeFlatMapIterableObservable extends Observable { + + final MaybeSource source; + + final Function> mapper; + + public MaybeFlatMapIterableObservable(MaybeSource source, + Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new FlatMapIterableObserver(s, mapper)); + } + + static final class FlatMapIterableObserver + extends BasicIntQueueDisposable + implements MaybeObserver { + + private static final long serialVersionUID = -8938804753851907758L; + + final Observer actual; + + final Function> mapper; + + Disposable d; + + volatile Iterator it; + + volatile boolean cancelled; + + boolean outputFused; + + FlatMapIterableObserver(Observer actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Iterator iter; + boolean has; + try { + iter = mapper.apply(value).iterator(); + + has = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + if (!has) { + actual.onComplete(); + return; + } + + this.it = iter; + drain(); + } + + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void dispose() { + cancelled = true; + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + Observer a = actual; + Iterator iter = this.it; + + if (outputFused && iter != null) { + a.onNext(null); + a.onComplete(); + return; + } + + int missed = 1; + + for (;;) { + + if (iter != null) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + + if (iter == null) { + iter = it; + } + } + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public void clear() { + it = null; + } + + @Override + public boolean isEmpty() { + return it == null; + } + + @Override + public R poll() throws Exception { + Iterator iter = it; + + if (iter != null) { + R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + if (iter.hasNext()) { + it = null; + } + return v; + } + return null; + } + + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java new file mode 100644 index 0000000000..1e03262138 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java @@ -0,0 +1,284 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.BackpressureHelper; + +/** + * Maps a success value into an Iterable and streams it back as a Flowable. + * + * @param the source value type + * @param the element type of the Iterable + */ +public final class SingleFlatMapIterableFlowable extends Flowable { + + final SingleSource source; + + final Function> mapper; + + public SingleFlatMapIterableFlowable(SingleSource source, + Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapIterableObserver(s, mapper)); + } + + static final class FlatMapIterableObserver + extends BasicIntQueueSubscription + implements SingleObserver { + + private static final long serialVersionUID = -8938804753851907758L; + + final Subscriber actual; + + final Function> mapper; + + final AtomicLong requested; + + Disposable d; + + volatile Iterator it; + + volatile boolean cancelled; + + boolean outputFused; + + FlatMapIterableObserver(Subscriber actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + this.requested = new AtomicLong(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Iterator iter; + boolean has; + try { + iter = mapper.apply(value).iterator(); + + has = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + if (!has) { + actual.onComplete(); + return; + } + + this.it = iter; + drain(); + } + + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + actual.onError(e); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + cancelled = true; + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + Subscriber a = actual; + Iterator iter = this.it; + + if (outputFused && iter != null) { + a.onNext(null); + a.onComplete(); + return; + } + + int missed = 1; + + for (;;) { + + if (iter != null) { + long r = requested.get(); + long e = 0L; + + if (r == Long.MAX_VALUE) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + + while (e != r) { + if (cancelled) { + return; + } + + R v; + + try { + v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + e++; + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + + if (iter == null) { + iter = it; + } + } + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public void clear() { + it = null; + } + + @Override + public boolean isEmpty() { + return it == null; + } + + @Override + public R poll() throws Exception { + Iterator iter = it; + + if (iter != null) { + R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + if (iter.hasNext()) { + it = null; + } + return v; + } + return null; + } + + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java new file mode 100644 index 0000000000..15fb2a203c --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java @@ -0,0 +1,226 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.Iterator; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.observers.BasicIntQueueDisposable; + +/** + * Maps a success value into an Iterable and streams it back as a Flowable. + * + * @param the source value type + * @param the element type of the Iterable + */ +public final class SingleFlatMapIterableObservable extends Observable { + + final SingleSource source; + + final Function> mapper; + + public SingleFlatMapIterableObservable(SingleSource source, + Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new FlatMapIterableObserver(s, mapper)); + } + + static final class FlatMapIterableObserver + extends BasicIntQueueDisposable + implements SingleObserver { + + private static final long serialVersionUID = -8938804753851907758L; + + final Observer actual; + + final Function> mapper; + + Disposable d; + + volatile Iterator it; + + volatile boolean cancelled; + + boolean outputFused; + + FlatMapIterableObserver(Observer actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + Iterator iter; + boolean has; + try { + iter = mapper.apply(value).iterator(); + + has = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + if (!has) { + actual.onComplete(); + return; + } + + this.it = iter; + drain(); + } + + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + actual.onError(e); + } + + @Override + public void dispose() { + cancelled = true; + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + Observer a = actual; + Iterator iter = this.it; + + if (outputFused && iter != null) { + a.onNext(null); + a.onComplete(); + return; + } + + int missed = 1; + + for (;;) { + + if (iter != null) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + + if (iter == null) { + iter = it; + } + } + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public void clear() { + it = null; + } + + @Override + public boolean isEmpty() { + return it == null; + } + + @Override + public R poll() throws Exception { + Iterator iter = it; + + if (iter != null) { + R v = ObjectHelper.requireNonNull(iter.next(), "The iterator returned a null value"); + if (iter.hasNext()) { + it = null; + } + return v; + } + return null; + } + + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java b/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java index 0c913a7ebb..c36b51ec7c 100644 --- a/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java +++ b/src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java @@ -61,13 +61,13 @@ public void testEmptyErrors() { try { new CompositeException(); fail("CompositeException should fail if errors is empty"); - } catch(IllegalArgumentException e) { + } catch (IllegalArgumentException e) { assertEquals("errors is empty", e.getMessage()); } try { new CompositeException(new ArrayList()); fail("CompositeException should fail if errors is empty"); - } catch(IllegalArgumentException e) { + } catch (IllegalArgumentException e) { assertEquals("errors is empty", e.getMessage()); } } @@ -351,7 +351,9 @@ public void badException() { } } -class BadException extends Throwable { +final class BadException extends Throwable { + private static final long serialVersionUID = 8999507293896399171L; + @Override public synchronized Throwable getCause() { return this; diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java new file mode 100644 index 0000000000..8de8ae79ff --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subscribers.TestSubscriber; + +public class MaybeFlatMapIterableFlowableTest { + + @Test + public void normal() { + + Maybe.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(1, 2); + } + + @Test + public void emptyIterable() { + + Maybe.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Collections.emptyList(); + } + }) + .test() + .assertResult(); + } + + @Test + public void error() { + + Maybe.error(new TestException()).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void empty() { + + Maybe.empty().flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(); + } + + @Test + public void backpressure() { + + TestSubscriber ts = Maybe.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test(0); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java new file mode 100644 index 0000000000..4402284876 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java @@ -0,0 +1,77 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +public class MaybeFlatMapIterableObservableTest { + + @Test + public void normal() { + + Maybe.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(1, 2); + } + + @Test + public void emptyIterable() { + + Maybe.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Collections.emptyList(); + } + }) + .test() + .assertResult(); + } + + @Test + public void error() { + + Maybe.error(new TestException()).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void empty() { + + Maybe.empty().flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java new file mode 100644 index 0000000000..ed27e01f60 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java @@ -0,0 +1,87 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subscribers.TestSubscriber; + +public class SingleFlatMapIterableFlowableTest { + + @Test + public void normal() { + + Single.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(1, 2); + } + + @Test + public void emptyIterable() { + + Single.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Collections.emptyList(); + } + }) + .test() + .assertResult(); + } + + @Test + public void error() { + + Single.error(new TestException()).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void backpressure() { + + TestSubscriber ts = Single.just(1).flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test(0); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java new file mode 100644 index 0000000000..81d21a0676 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +public class SingleFlatMapIterableObservableTest { + + @Test + public void normal() { + + Single.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertResult(1, 2); + } + + @Test + public void emptyIterable() { + + Single.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Collections.emptyList(); + } + }) + .test() + .assertResult(); + } + + @Test + public void error() { + + Single.error(new TestException()).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(v, v + 1); + } + }) + .test() + .assertFailure(TestException.class); + } +}