Skip to content

Commit

Permalink
2.x: Remove takeFirst(predicate) in Observable & Flowable (#4595)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanniktech authored and akarnokd committed Sep 23, 2016
1 parent 4f86ee0 commit 3d1b379
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 63 deletions.
26 changes: 0 additions & 26 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12341,32 +12341,6 @@ public final Flowable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
return takeUntil(timer(time, unit, scheduler));
}

/**
* Returns a Flowable that emits only the very first item emitted by the source Publisher that satisfies
* a specified condition.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeFirstN.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeFirst} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* the condition any item emitted by the source Publisher has to satisfy
* @return a Flowable that emits only the very first item emitted by the source Publisher that satisfies
* the given condition, or that completes without emitting anything if the source Publisher
* completes without emitting a single condition-satisfying item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> takeFirst(Predicate<? super T> predicate) {
return filter(predicate).take(1);
}

/**
* Returns a Flowable that emits at most the last {@code count} items emitted by the source Publisher. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10333,28 +10333,6 @@ public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
return takeUntil(timer(time, unit, scheduler));
}

/**
* Returns an Observable that emits only the very first item emitted by the source ObservableSource that satisfies
* a specified condition.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeFirstN.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeFirst} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* the condition any item emitted by the source ObservableSource has to satisfy
* @return an Observable that emits only the very first item emitted by the source ObservableSource that satisfies
* the given condition, or that completes without emitting anything if the source ObservableSource
* completes without emitting a single condition-satisfying item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> takeFirst(Predicate<? super T> predicate) {
return filter(predicate).take(1);
}

/**
* Returns an Observable that emits at most the last {@code count} items emitted by the source ObservableSource. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand Down
7 changes: 1 addition & 6 deletions src/test/java/io/reactivex/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2179,11 +2179,6 @@ public void takeTimedSchedulerNull() {
just1.take(1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void takeFirstNull() {
just1.takeFirst(null);
}

@Test(expected = NullPointerException.class)
public void takeLastTimedUnitNull() {
just1.takeLast(1, null, Schedulers.single());
Expand Down Expand Up @@ -2957,4 +2952,4 @@ public Object apply(Object[] v) {
}
}, 128, just1).blockingLast();
}
}
}
5 changes: 3 additions & 2 deletions src/test/java/io/reactivex/flowable/FlowableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ public Throwable call() {
verify(wo, times(1)).onError(any(RuntimeException.class));
}

@Test
public void testTakeFirstWithPredicateOfSome() {
Flowable<Integer> observable = Flowable.just(1, 3, 5, 4, 6, 3);
observable.takeFirst(IS_EVEN).subscribe(w);
observable.filter(IS_EVEN).take(1).subscribe(w);
verify(w, times(1)).onNext(anyInt());
verify(w).onNext(4);
verify(w, times(1)).onComplete();
Expand All @@ -194,7 +195,7 @@ public void testTakeFirstWithPredicateOfSome() {
@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
Flowable<Integer> observable = Flowable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
observable.takeFirst(IS_EVEN).subscribe(w);
observable.filter(IS_EVEN).take(1).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onComplete();
verify(w, never()).onError(any(Throwable.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2250,11 +2250,6 @@ public void takeTimedSchedulerNull() {
just1.take(1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void takeFirstNull() {
just1.takeFirst(null);
}

@Test(expected = NullPointerException.class)
public void takeLastTimedUnitNull() {
just1.takeLast(1, null, Schedulers.single());
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/reactivex/observable/ObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Throwable call() {
@Test
public void testTakeFirstWithPredicateOfSome() {
Observable<Integer> o = Observable.just(1, 3, 5, 4, 6, 3);
o.takeFirst(IS_EVEN).subscribe(w);
o.filter(IS_EVEN).take(1).subscribe(w);
verify(w, times(1)).onNext(anyInt());
verify(w).onNext(4);
verify(w, times(1)).onComplete();
Expand All @@ -196,7 +196,7 @@ public void testTakeFirstWithPredicateOfSome() {
@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
Observable<Integer> o = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
o.takeFirst(IS_EVEN).subscribe(w);
o.filter(IS_EVEN).take(1).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onComplete();
verify(w, never()).onError(any(Throwable.class));
Expand Down

0 comments on commit 3d1b379

Please sign in to comment.