diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ca84b15e44..2ac0f42754 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,6 +15,8 @@ */ package rx; +import static rx.util.functions.Functions.not; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -37,6 +39,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; import rx.operators.OperationFinally; +import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationInterval; import rx.operators.OperationMap; @@ -3322,6 +3325,63 @@ public Observable skip(int num) { return create(OperationSkip.skip(this, num)); } + /** + * Returns an Observable that emits only the very first item emitted by the source Observable. + * + * @return an Observable that emits only the very first item from the source, or none if the + * source Observable completes without emitting a single item. + * @see MSDN: Observable.First + */ + public Observable first() { + return take(1); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition. + * + * @param predicate + * The condition any source emitted item has to satisfy. + * @return an Observable that emits only the very first item satisfying the given condition from the source, + * or none if the source Observable completes without emitting a single matching item. + * @see MSDN: Observable.First + */ + public Observable first(Func1 predicate) { + return skipWhile(not(predicate)).take(1); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable, or + * a default value. + * + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything. + * @return an Observable that emits only the very first item from the source, or a default value + * if the source Observable completes without emitting a single item. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition, or a default value otherwise. + * + * @param predicate + * The condition any source emitted item has to satisfy. + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything that + * satisfies the given condition. + * @return an Observable that emits only the very first item from the source that satisfies the + * given condition, or a default value otherwise. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(Func1 predicate, T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); + } + + /** * Returns an Observable that emits only the first num items emitted by the source * Observable. @@ -3374,6 +3434,33 @@ public Observable takeWhileWithIndex(final Func2MSDN: Observable.First + * @see {@link #first()} + */ + public Observable takeFirst() { + return first(); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition. + * + * @param predicate + * The condition any source emitted item has to satisfy. + * @return an Observable that emits only the very first item satisfying the given condition from the source, + * or none if the source Observable completes without emitting a single matching item. + * @see MSDN: Observable.First + * @see {@link #first(Func1)} + */ + public Observable takeFirst(Func1 predicate) { + return first(predicate); + } + /** * Returns an Observable that emits only the last count items emitted by the source * Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java new file mode 100644 index 0000000000..73283a8a3b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java @@ -0,0 +1,190 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.create; +import static rx.Observable.empty; +import static rx.Observable.from; +import static rx.util.functions.Functions.alwaysTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + */ +public final class OperationFirstOrDefault { + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable that satisfies the given condition, + * or a default value if the source emits no items that satisfy the given condition. + * + * @param source + * The source Observable to emit the first item for. + * @param predicate + * The condition the emitted source items have to satisfy. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, Func1 predicate, T defaultValue) { + return new FirstOrElse(source, predicate, defaultValue); + } + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + * + * @param source + * The source Observable to emit the first item for. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, T defaultValue) { + return new FirstOrElse(source, alwaysTrue(), defaultValue); + } + + private static class FirstOrElse implements OnSubscribeFunc { + private final Observable source; + private final Func1 predicate; + private final T defaultValue; + + private FirstOrElse(Observable source, Func1 predicate, T defaultValue) { + this.source = source; + this.defaultValue = defaultValue; + this.predicate = predicate; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + private final AtomicBoolean hasEmitted = new AtomicBoolean(false); + + @Override + public void onCompleted() { + if (!hasEmitted.get()) { + observer.onNext(defaultValue); + observer.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + try { + if (!hasEmitted.get() && predicate.call(next)) { + hasEmitted.set(true); + observer.onNext(next); + observer.onCompleted(); + } + } catch (Throwable t) { + // may happen within the predicate call (user code) + observer.onError(t); + } + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + @Mock + Observer w; + + private static final Func1 IS_D = new Func1() { + @Override + public Boolean call(String value) { + return "d".equals(value); + } + }; + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testFirstOrElseOfNone() { + Observable src = empty(); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseOfSome() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("a"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfSome() { + Observable src = from("a", "b", "c", "d", "e", "f"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("d"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 7809b3240d..30728600a8 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -313,6 +313,16 @@ public Void call(Object... args) { }; } + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public static Func1 not(Func1 predicate) { + return new Not(predicate); + } + public static Func1 alwaysTrue() { return AlwaysTrue.INSTANCE; } @@ -334,4 +344,5 @@ public Boolean call(Object o) { return true; } } + } diff --git a/rxjava-core/src/main/java/rx/util/functions/Not.java b/rxjava-core/src/main/java/rx/util/functions/Not.java new file mode 100644 index 0000000000..d3928f7888 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Not.java @@ -0,0 +1,40 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * Implements the negation of a predicate. + * + * @param The type of the single input parameter. + */ +public class Not implements Func1 { + private final Func1 predicate; + + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public Not(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Boolean call(T param) { + return !predicate.call(param); + } +} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index ba5d836319..92af02b1dd 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -35,6 +35,7 @@ import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; public class ObservableTests { @@ -42,6 +43,13 @@ public class ObservableTests { @Mock Observer w; + private static final Func1 IS_EVEN = new Func1() { + @Override + public Boolean call(Integer value) { + return value % 2 == 0; + } + }; + @Before public void before() { MockitoAnnotations.initMocks(this); @@ -147,6 +155,43 @@ public Subscription onSubscribe(Observer obsv) { verify(w, times(1)).onError(any(RuntimeException.class)); } + public void testFirstWithPredicateOfSome() { + Observable observable = Observable.from(1, 3, 5, 4, 6, 3); + observable.first(IS_EVEN).subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(4); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstWithPredicateOfNoneMatchingThePredicate() { + Observable observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1); + observable.first(IS_EVEN).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfSome() { + Observable observable = Observable.from(1, 2, 3); + observable.first().subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(1); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfNone() { + Observable observable = Observable.empty(); + observable.first().subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + @Test public void testReduce() { Observable observable = Observable.from(1, 2, 3, 4);