Skip to content

Commit

Permalink
2.x: improve performance of Observable.flatMapIterable (#4612)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Sep 27, 2016
1 parent d3a5776 commit ab21265
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5736,7 +5736,7 @@ public final <R> Observable<R> concatMapEagerDelayError(Function<? super T, ? ex
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper));
return RxJavaPlugins.onAssembly(new ObservableFlattenIterable<T, U>(this, mapper));
}

/**
Expand Down Expand Up @@ -7188,7 +7188,7 @@ public final <U, R> Observable<R> flatMap(Function<? super T, ? extends Observab
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper));
return RxJavaPlugins.onAssembly(new ObservableFlattenIterable<T, U>(this, mapper));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import java.util.Iterator;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Maps a sequence into an Iterable and emits its values.
*
* @param <T> the input value type to map to Iterable
* @param <R> the element type of the Iterable and the output
*/
public final class ObservableFlattenIterable<T, R> extends AbstractObservableWithUpstream<T, R> {

final Function<? super T, ? extends Iterable<? extends R>> mapper;

public ObservableFlattenIterable(ObservableSource<T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper) {
super(source);
this.mapper = mapper;
}

@Override
protected void subscribeActual(Observer<? super R> observer) {
source.subscribe(new FlattenIterableObserver<T, R>(observer, mapper));
}

static final class FlattenIterableObserver<T, R> implements Observer<T>, Disposable {
final Observer<? super R> actual;

final Function<? super T, ? extends Iterable<? extends R>> mapper;

Disposable d;

FlattenIterableObserver(Observer<? super R> actual, Function<? super T, ? extends Iterable<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onNext(T value) {
if (d == DisposableHelper.DISPOSED) {
return;
}

Iterator<? extends R> it;

try {
it = mapper.apply(value).iterator();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}

Observer<? super R> a = actual;

for (;;) {
boolean b;

try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}

if (b) {
R v;

try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
return;
}

a.onNext(v);
} else {
break;
}
}
}

@Override
public void onError(Throwable e) {
if (d == DisposableHelper.DISPOSED) {
RxJavaPlugins.onError(e);
return;
}
actual.onError(e);
}

@Override
public void onComplete() {
if (d == DisposableHelper.DISPOSED) {
return;
}
actual.onComplete();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}

@Override
public void dispose() {
d.dispose();
d = DisposableHelper.DISPOSED;
}
}
}
66 changes: 66 additions & 0 deletions src/perf/java/io/reactivex/FlatMapJustPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;

import io.reactivex.functions.Function;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlatMapJustPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;

Flowable<Integer> flowable;

Observable<Integer> observable;

@Setup
public void setup() {
Integer[] array = new Integer[times];

flowable = Flowable.fromArray(array).flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.just(v);
}
});

observable = Observable.fromArray(array).flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) throws Exception {
return Observable.just(v);
}
});
}

@Benchmark
public void flowable(Blackhole bh) {
flowable.subscribe(new PerfConsumer(bh));
}

@Benchmark
public void observable(Blackhole bh) {
observable.subscribe(new PerfConsumer(bh));
}
}
72 changes: 72 additions & 0 deletions src/perf/java/io/reactivex/FlattenCrossMapPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.functions.Function;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlattenCrossMapPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;

Flowable<Integer> flowable;

Observable<Integer> observable;

@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);

Integer[] arrayInner = new Integer[1000000 / times];
Arrays.fill(arrayInner, 888);

final Iterable<Integer> list = Arrays.asList(arrayInner);

flowable = Flowable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return list;
}
});

observable = Observable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return list;
}
});
}

@Benchmark
public void flowable(Blackhole bh) {
flowable.subscribe(new PerfConsumer(bh));
}

@Benchmark
public void observable(Blackhole bh) {
observable.subscribe(new PerfConsumer(bh));
}
}
69 changes: 69 additions & 0 deletions src/perf/java/io/reactivex/FlattenJustPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.functions.Function;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlattenJustPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;

Flowable<Integer> flowable;

Observable<Integer> observable;

@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);

final Iterable<Integer> singletonList = Collections.singletonList(1);

flowable = Flowable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return singletonList;
}
});

observable = Observable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return singletonList;
}
});
}

@Benchmark
public void flowable(Blackhole bh) {
flowable.subscribe(new PerfConsumer(bh));
}

@Benchmark
public void observable(Blackhole bh) {
observable.subscribe(new PerfConsumer(bh));
}
}
Loading

0 comments on commit ab21265

Please sign in to comment.