Skip to content

Commit

Permalink
2.x: dedicated reduce() op implementations (#4885)
Browse files Browse the repository at this point in the history
* 2.x: dedicated reduce() op implementations

* Fix unnecessary duplicate assignment, remove trailing spaces

* Check for more terminal state
  • Loading branch information
akarnokd authored Nov 28, 2016
1 parent baa00f7 commit f294938
Show file tree
Hide file tree
Showing 12 changed files with 616 additions and 7 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ dependencies {
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:2.1.0'

perfCompile 'org.openjdk.jmh:jmh-core:1.13'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
perfCompile 'org.openjdk.jmh:jmh-core:1.16'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.16'

testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0'
testCompile group: 'org.testng', name: 'testng', version: '6.9.10'
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10516,7 +10516,9 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<R>(scan(seed, reducer).takeLast(1), null)); // TODO dedicated operator
ObjectHelper.requireNonNull(seed, "seed is null");
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new FlowableReduceSeedSingle<T, R>(this, seed, reducer));
}

/**
Expand Down Expand Up @@ -10567,7 +10569,9 @@ public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<R>(scanWith(seedSupplier, reducer).takeLast(1), null)); // TODO dedicated operator
ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null");
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new FlowableReduceWithSingle<T, R>(this, seedSupplier, reducer));
}

/**
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8685,7 +8685,8 @@ public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
return scan(reducer).takeLast(1).singleElement();
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ObservableReduceMaybe<T>(this, reducer));
}

/**
Expand Down Expand Up @@ -8732,7 +8733,9 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<R>(scan(seed, reducer).takeLast(1), null));
ObjectHelper.requireNonNull(seed, "seed is null");
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ObservableReduceSeedSingle<T, R>(this, seed, reducer));
}

/**
Expand Down Expand Up @@ -8779,7 +8782,9 @@ public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<R>(scanWith(seedSupplier, reducer).takeLast(1), null));
ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null");
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ObservableReduceWithSingle<T, R>(this, seedSupplier, reducer));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Reduce a sequence of values, starting from a seed value and by using
* an accumulator function and return the last accumulated value.
*
* @param <T> the source value type
* @param <R> the accumulated result type
*/
public final class FlowableReduceSeedSingle<T, R> extends Single<R> {

final Publisher<T> source;

final R seed;

final BiFunction<R, ? super T, R> reducer;

public FlowableReduceSeedSingle(Publisher<T> source, R seed, BiFunction<R, ? super T, R> reducer) {
this.source = source;
this.seed = seed;
this.reducer = reducer;
}

@Override
protected void subscribeActual(SingleObserver<? super R> observer) {
source.subscribe(new ReduceSeedObserver<T, R>(observer, reducer, seed));
}

static final class ReduceSeedObserver<T, R> implements Subscriber<T>, Disposable {

final SingleObserver<? super R> actual;

final BiFunction<R, ? super T, R> reducer;

R value;

Subscription s;

public ReduceSeedObserver(SingleObserver<? super R> actual, BiFunction<R, ? super T, R> reducer, R value) {
this.actual = actual;
this.value = value;
this.reducer = reducer;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T value) {
R v = this.value;
if (v != null) {
try {
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}

@Override
public void onError(Throwable e) {
R v = value;
value = null;
if (v != null) {
s = SubscriptionHelper.CANCELLED;
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
R v = value;
value = null;
if (v != null) {
s = SubscriptionHelper.CANCELLED;
actual.onSuccess(v);
}
}

@Override
public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.Callable;

import org.reactivestreams.Publisher;

import io.reactivex.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.flowable.FlowableReduceSeedSingle.ReduceSeedObserver;

/**
* Reduce a sequence of values, starting from a generated seed value and by using
* an accumulator function and return the last accumulated value.
*
* @param <T> the source value type
* @param <R> the accumulated result type
*/
public final class FlowableReduceWithSingle<T, R> extends Single<R> {

final Publisher<T> source;

final Callable<R> seedSupplier;

final BiFunction<R, ? super T, R> reducer;

public FlowableReduceWithSingle(Publisher<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
this.source = source;
this.seedSupplier = seedSupplier;
this.reducer = reducer;
}

@Override
protected void subscribeActual(SingleObserver<? super R> observer) {
R seed;

try {
seed = ObjectHelper.requireNonNull(seedSupplier.call(), "The seedSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
source.subscribe(new ReduceSeedObserver<T, R>(observer, reducer, seed));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Reduce a sequence of values into a single value via an aggregator function and emit the final value or complete
* if the source is empty.
*
* @param <T> the source and result value type
*/
public final class ObservableReduceMaybe<T> extends Maybe<T> {

final ObservableSource<T> source;

final BiFunction<T, T, T> reducer;

public ObservableReduceMaybe(ObservableSource<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
this.reducer = reducer;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new ReduceObserver<T>(observer, reducer));
}

static final class ReduceObserver<T> implements Observer<T>, Disposable {

final MaybeObserver<? super T> actual;

final BiFunction<T, T, T> reducer;

boolean done;

T value;

Disposable d;

public ReduceObserver(MaybeObserver<? super T> observer, BiFunction<T, T, T> reducer) {
this.actual = observer;
this.reducer = reducer;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onNext(T value) {
if (!done) {
T v = this.value;

if (v == null) {
this.value = value;
} else {
try {
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.onError(e);
return;
}
done = true;
value = null;
actual.onError(e);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
T v = value;
value = null;
if (v != null) {
actual.onSuccess(v);
} else {
actual.onComplete();
}
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Loading

0 comments on commit f294938

Please sign in to comment.