From bd383717e80444651733a9fb532cad3bb305289c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 14:06:43 +0200 Subject: [PATCH 1/3] Added the two variants of the first operator. --- rxjava-core/src/main/java/rx/Observable.java | 22 +++++++++ .../java/rx/util/functions/Functions.java | 11 +++++ .../src/main/java/rx/util/functions/Not.java | 40 ++++++++++++++++ .../src/test/java/rx/ObservableTests.java | 46 +++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/util/functions/Not.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 010071e92f..b58c5eb7fa 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,6 +15,8 @@ */ package rx; +import static rx.util.functions.Functions.not; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -2275,6 +2277,26 @@ public Observable take(final int num) { return create(OperationTake.take(this, num)); } + /** + * Returns an Observable that emits only the very first item emitted by the source Observable. + * @return an Observable that emits only the very first item from the source, or none if the + * source Observable completes without emitting a single item. + * @see MSDN: Observable.First + */ + public Observable first() { + return take(1); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable. + * @return an Observable that emits only the very first item from the source, or none if the + * source Observable completes without emitting a single item. + * @see MSDN: Observable.First + */ + public Observable first(Func1 predicate) { + return skipWhile(not(predicate)).take(1); + } + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true. diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 7809b3240d..30728600a8 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -313,6 +313,16 @@ public Void call(Object... args) { }; } + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public static Func1 not(Func1 predicate) { + return new Not(predicate); + } + public static Func1 alwaysTrue() { return AlwaysTrue.INSTANCE; } @@ -334,4 +344,5 @@ public Boolean call(Object o) { return true; } } + } diff --git a/rxjava-core/src/main/java/rx/util/functions/Not.java b/rxjava-core/src/main/java/rx/util/functions/Not.java new file mode 100644 index 0000000000..d3928f7888 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Not.java @@ -0,0 +1,40 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * Implements the negation of a predicate. + * + * @param The type of the single input parameter. + */ +public class Not implements Func1 { + private final Func1 predicate; + + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public Not(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Boolean call(T param) { + return !predicate.call(param); + } +} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index ba11919040..830370421f 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -35,6 +35,7 @@ import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; public class ObservableTests { @@ -42,6 +43,13 @@ public class ObservableTests { @Mock Observer w; + private static final Func1 IS_EVEN = new Func1() { + @Override + public Boolean call(Integer value) { + return value % 2 == 0; + } + }; + @Before public void before() { MockitoAnnotations.initMocks(this); @@ -73,6 +81,44 @@ public Subscription onSubscribe(Observer Observer) { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testFirstWithPredicateOfSome() { + Observable observable = Observable.from(1, 3, 5, 4, 6, 3); + observable.first(IS_EVEN).subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(4); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstWithPredicateOfNoneMatchingThePredicate() { + Observable observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1); + observable.first(IS_EVEN).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfSome() { + Observable observable = Observable.from(1, 2, 3); + observable.first().subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(1); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfNone() { + Observable observable = Observable.empty(); + observable.first().subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + @Test public void testReduce() { Observable observable = Observable.from(1, 2, 3, 4); From 44525182d5d6f26258d24373e579f9607c2d0d73 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 15:20:42 +0200 Subject: [PATCH 2/3] Added FirstOrDefault operation --- rxjava-core/src/main/java/rx/Observable.java | 39 +++- .../rx/operators/OperationFirstOrDefault.java | 190 ++++++++++++++++++ 2 files changed, 226 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b58c5eb7fa..bd14311fc2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -37,6 +37,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; import rx.operators.OperationFinally; +import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -2288,15 +2289,47 @@ public Observable first() { } /** - * Returns an Observable that emits only the very first item emitted by the source Observable. - * @return an Observable that emits only the very first item from the source, or none if the - * source Observable completes without emitting a single item. + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition. + * @param predicate + * The condition any source emitted item has to satisfy. + * @return an Observable that emits only the very first item satisfying the given condition from the source, + * or none if the source Observable completes without emitting a single matching item. * @see MSDN: Observable.First */ public Observable first(Func1 predicate) { return skipWhile(not(predicate)).take(1); } + /** + * Returns an Observable that emits only the very first item emitted by the source Observable, or + * a default value. + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything. + * @return an Observable that emits only the very first item from the source, or a default value + * if the source Observable completes without emitting a single item. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition, or a default value otherwise. + * @param predicate + * The condition any source emitted item has to satisfy. + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything that + * satisfies the given condition. + * @return an Observable that emits only the very first item from the source that satisfies the + * given condition, or a default value otherwise. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(Func1 predicate, T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); + } + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true. diff --git a/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java new file mode 100644 index 0000000000..73283a8a3b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java @@ -0,0 +1,190 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.create; +import static rx.Observable.empty; +import static rx.Observable.from; +import static rx.util.functions.Functions.alwaysTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + */ +public final class OperationFirstOrDefault { + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable that satisfies the given condition, + * or a default value if the source emits no items that satisfy the given condition. + * + * @param source + * The source Observable to emit the first item for. + * @param predicate + * The condition the emitted source items have to satisfy. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, Func1 predicate, T defaultValue) { + return new FirstOrElse(source, predicate, defaultValue); + } + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + * + * @param source + * The source Observable to emit the first item for. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, T defaultValue) { + return new FirstOrElse(source, alwaysTrue(), defaultValue); + } + + private static class FirstOrElse implements OnSubscribeFunc { + private final Observable source; + private final Func1 predicate; + private final T defaultValue; + + private FirstOrElse(Observable source, Func1 predicate, T defaultValue) { + this.source = source; + this.defaultValue = defaultValue; + this.predicate = predicate; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + private final AtomicBoolean hasEmitted = new AtomicBoolean(false); + + @Override + public void onCompleted() { + if (!hasEmitted.get()) { + observer.onNext(defaultValue); + observer.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + try { + if (!hasEmitted.get() && predicate.call(next)) { + hasEmitted.set(true); + observer.onNext(next); + observer.onCompleted(); + } + } catch (Throwable t) { + // may happen within the predicate call (user code) + observer.onError(t); + } + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + @Mock + Observer w; + + private static final Func1 IS_D = new Func1() { + @Override + public Boolean call(String value) { + return "d".equals(value); + } + }; + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testFirstOrElseOfNone() { + Observable src = empty(); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseOfSome() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("a"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfSome() { + Observable src = from("a", "b", "c", "d", "e", "f"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("d"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + } +} From 98cccec27252f30578cd3cf1b7aeddcb9837a2fd Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 15:22:51 +0200 Subject: [PATCH 3/3] Added link urls to the msdn descriptions --- rxjava-core/src/main/java/rx/Observable.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index bd14311fc2..31d7399126 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2308,12 +2308,12 @@ public Observable first(Func1 predicate) { * The default value to emit if the source Observable doesn't emit anything. * @return an Observable that emits only the very first item from the source, or a default value * if the source Observable completes without emitting a single item. - * @see MSDN: Observable.FirstOrDefault + * @see MSDN: Observable.FirstOrDefault */ public Observable firstOrDefault(T defaultValue) { return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); } - + /** * Returns an Observable that emits only the very first item emitted by the source Observable * that satisfies a given condition, or a default value otherwise. @@ -2324,12 +2324,12 @@ public Observable firstOrDefault(T defaultValue) { * satisfies the given condition. * @return an Observable that emits only the very first item from the source that satisfies the * given condition, or a default value otherwise. - * @see MSDN: Observable.FirstOrDefault + * @see MSDN: Observable.FirstOrDefault */ public Observable firstOrDefault(Func1 predicate, T defaultValue) { return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); } - + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true.