Skip to content

Commit

Permalink
Merge branch 'first-firstDefault' of git://github.com/jmhofer/RxJava …
Browse files Browse the repository at this point in the history
…into operator-first-merge

Conflicts:
	rxjava-core/src/test/java/rx/ObservableTests.java

This merges pull request ReactiveX#357

Also aliased first with takeFirst to match takeLast.
  • Loading branch information
benjchristensen committed Sep 10, 2013
2 parents 96feb27 + 98cccec commit d6bf9d1
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 0 deletions.
87 changes: 87 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx;

import static rx.util.functions.Functions.not;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -37,6 +39,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationMap;
Expand Down Expand Up @@ -3322,6 +3325,63 @@ public Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable.
*
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first() {
return take(1);
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable
* that satisfies a given condition.
*
* @param predicate
* The condition any source emitted item has to satisfy.
* @return an Observable that emits only the very first item satisfying the given condition from the source,
* or none if the source Observable completes without emitting a single matching item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first(Func1<? super T, Boolean> predicate) {
return skipWhile(not(predicate)).take(1);
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable, or
* a default value.
*
* @param defaultValue
* The default value to emit if the source Observable doesn't emit anything.
* @return an Observable that emits only the very first item from the source, or a default value
* if the source Observable completes without emitting a single item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320%28v=vs.103%29.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable
* that satisfies a given condition, or a default value otherwise.
*
* @param predicate
* The condition any source emitted item has to satisfy.
* @param defaultValue
* The default value to emit if the source Observable doesn't emit anything that
* satisfies the given condition.
* @return an Observable that emits only the very first item from the source that satisfies the
* given condition, or a default value otherwise.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759%28v=vs.103%29.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
}


/**
* Returns an Observable that emits only the first <code>num</code> items emitted by the source
* Observable.
Expand Down Expand Up @@ -3374,6 +3434,33 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
return create(OperationTakeWhile.takeWhileWithIndex(this, predicate));
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable.
*
* @return an Observable that emits only the very first item from the source, or none if the
* source Observable completes without emitting a single item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first()}
*/
public Observable<T> takeFirst() {
return first();
}

/**
* Returns an Observable that emits only the very first item emitted by the source Observable
* that satisfies a given condition.
*
* @param predicate
* The condition any source emitted item has to satisfy.
* @return an Observable that emits only the very first item satisfying the given condition from the source,
* or none if the source Observable completes without emitting a single matching item.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
* @see {@link #first(Func1)}
*/
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
return first(predicate);
}

/**
* Returns an Observable that emits only the last <code>count</code> items emitted by the source
* Observable.
Expand Down
190 changes: 190 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
import static rx.Observable.create;
import static rx.Observable.empty;
import static rx.Observable.from;
import static rx.util.functions.Functions.alwaysTrue;

import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* Returns an Observable that emits the first item emitted by the source
* Observable, or a default value if the source emits nothing.
*/
public final class OperationFirstOrDefault {

/**
* Returns an Observable that emits the first item emitted by the source
* Observable that satisfies the given condition,
* or a default value if the source emits no items that satisfy the given condition.
*
* @param source
* The source Observable to emit the first item for.
* @param predicate
* The condition the emitted source items have to satisfy.
* @param defaultValue
* The default value to use whenever the source Observable doesn't emit anything.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
return new FirstOrElse<T>(source, predicate, defaultValue);
}

/**
* Returns an Observable that emits the first item emitted by the source
* Observable, or a default value if the source emits nothing.
*
* @param source
* The source Observable to emit the first item for.
* @param defaultValue
* The default value to use whenever the source Observable doesn't emit anything.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, T defaultValue) {
return new FirstOrElse<T>(source, alwaysTrue(), defaultValue);
}

private static class FirstOrElse<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final Func1<? super T, Boolean> predicate;
private final T defaultValue;

private FirstOrElse(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
this.source = source;
this.defaultValue = defaultValue;
this.predicate = predicate;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final Subscription sourceSub = source.subscribe(new Observer<T>() {
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);

@Override
public void onCompleted() {
if (!hasEmitted.get()) {
observer.onNext(defaultValue);
observer.onCompleted();
}
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T next) {
try {
if (!hasEmitted.get() && predicate.call(next)) {
hasEmitted.set(true);
observer.onNext(next);
observer.onCompleted();
}
} catch (Throwable t) {
// may happen within the predicate call (user code)
observer.onError(t);
}
}
});

return Subscriptions.create(new Action0() {
@Override
public void call() {
sourceSub.unsubscribe();
}
});
}
}

public static class UnitTest {
@Mock
Observer<? super String> w;

private static final Func1<String, Boolean> IS_D = new Func1<String, Boolean>() {
@Override
public Boolean call(String value) {
return "d".equals(value);
}
};

@Before
public void before() {
initMocks(this);
}

@Test
public void testFirstOrElseOfNone() {
Observable<String> src = empty();
create(firstOrDefault(src, "default")).subscribe(w);

verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("default");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}

@Test
public void testFirstOrElseOfSome() {
Observable<String> src = from("a", "b", "c");
create(firstOrDefault(src, "default")).subscribe(w);

verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("a");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}

@Test
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() {
Observable<String> src = from("a", "b", "c");
create(firstOrDefault(src, IS_D, "default")).subscribe(w);

verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("default");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}

@Test
public void testFirstOrElseWithPredicateOfSome() {
Observable<String> src = from("a", "b", "c", "d", "e", "f");
create(firstOrDefault(src, IS_D, "default")).subscribe(w);

verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("d");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}
}
}
11 changes: 11 additions & 0 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ public Void call(Object... args) {
};
}

/**
* Constructs a predicate that returns true for each input that the source
* predicate returns false for and vice versa.
*
* @param predicate The source predicate to negate.
*/
public static <T> Func1<T, Boolean> not(Func1<? super T, Boolean> predicate) {
return new Not<T>(predicate);
}

public static <T> Func1<? super T, Boolean> alwaysTrue() {
return AlwaysTrue.INSTANCE;
}
Expand All @@ -334,4 +344,5 @@ public Boolean call(Object o) {
return true;
}
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/util/functions/Not.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.util.functions;

/**
* Implements the negation of a predicate.
*
* @param <T> The type of the single input parameter.
*/
public class Not<T> implements Func1<T, Boolean> {
private final Func1<? super T, Boolean> predicate;

/**
* Constructs a predicate that returns true for each input that the source
* predicate returns false for and vice versa.
*
* @param predicate The source predicate to negate.
*/
public Not(Func1<? super T, Boolean> predicate) {
this.predicate = predicate;
}

@Override
public Boolean call(T param) {
return !predicate.call(param);
}
}
Loading

0 comments on commit d6bf9d1

Please sign in to comment.