Skip to content

Commit

Permalink
Merge pull request #571 from akarnokd/SampleWithObservable2
Browse files Browse the repository at this point in the history
Operation Sample with Observable v2
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents 2d91c99 + 1b31bca commit 53787f2
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 2 deletions.
17 changes: 16 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4463,7 +4463,22 @@ public Observable<T> sample(long period, TimeUnit unit) {
public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
return create(OperationSample.sample(this, period, unit, scheduler));
}


/**
* Return an Observable that emits the results of sampling the items
* emitted by this Observable when the <code>sampler</code>
* Observable produces an item or completes.
*
* @param sampler the Observable to use for sampling this
*
* @return an Observable that emits the results of sampling the items
* emitted by this Observable when the <code>sampler</code>
* Observable produces an item or completes.
*/
public <U> Observable<T> sample(Observable<U> sampler) {
return create(new OperationSample.SampleWithObservable<T, U>(this, sampler));
}

/**
* Returns an Observable that applies a function of your choosing to the
* first item emitted by a source Observable, then feeds the result of that
Expand Down
89 changes: 89 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationSample.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

Expand Down Expand Up @@ -115,4 +117,91 @@ public void call() {
});
}
}
/**
* Sample with the help of another observable.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229742.aspx'>MSDN: Observable.Sample</a>
*/
public static class SampleWithObservable<T, U> implements OnSubscribeFunc<T> {
final Observable<T> source;
final Observable<U> sampler;
public SampleWithObservable(Observable<T> source, Observable<U> sampler) {
this.source = source;
this.sampler = sampler;
}
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return new ResultManager(t1).init();
}
/** Observe source values. */
class ResultManager implements Observer<T> {
final Observer<? super T> observer;
final CompositeSubscription cancel;
T value;
boolean valueTaken = true;
boolean done;
final Object guard;
public ResultManager(Observer<? super T> observer) {
this.observer = observer;
cancel = new CompositeSubscription();
guard = new Object();
}
public Subscription init() {
cancel.add(source.subscribe(this));
cancel.add(sampler.subscribe(new Sampler()));

return cancel;
}
@Override
public void onNext(T args) {
synchronized (guard) {
valueTaken = false;
value = args;
}
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
if (!done) {
done = true;
observer.onError(e);
cancel.unsubscribe();
}
}
}

@Override
public void onCompleted() {
synchronized (guard) {
if (!done) {
done = true;
observer.onCompleted();
cancel.unsubscribe();
}
}
}
/** Take the latest value, but only once. */
class Sampler implements Observer<U> {
@Override
public void onNext(U args) {
synchronized (guard) {
if (!valueTaken && !done) {
valueTaken = true;
observer.onNext(value);
}
}
}

@Override
public void onError(Throwable e) {
ResultManager.this.onError(e);
}

@Override
public void onCompleted() {
ResultManager.this.onCompleted();
}
}
}
}
}
159 changes: 158 additions & 1 deletion rxjava-core/src/test/java/rx/operators/OperationSampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
Expand All @@ -28,19 +27,22 @@
import rx.Observer;
import rx.Subscription;
import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

public class OperationSampleTest {
private TestScheduler scheduler;
private Observer<Long> observer;
private Observer<Object> observer2;

@Before
@SuppressWarnings("unchecked")
// due to mocking
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
observer2 = mock(Observer.class);
}

@Test
Expand Down Expand Up @@ -105,4 +107,159 @@ public void call() {
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerNormal() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
source.onNext(2);
sampler.onNext(1);
source.onNext(3);
source.onNext(4);
sampler.onNext(2);
source.onCompleted();
sampler.onNext(3);


InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, never()).onNext(1);
inOrder.verify(observer2, times(1)).onNext(2);
inOrder.verify(observer2, never()).onNext(3);
inOrder.verify(observer2, times(1)).onNext(4);
inOrder.verify(observer2, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerNoDuplicates() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
source.onNext(2);
sampler.onNext(1);
sampler.onNext(1);

source.onNext(3);
source.onNext(4);
sampler.onNext(2);
sampler.onNext(2);

source.onCompleted();
sampler.onNext(3);


InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, never()).onNext(1);
inOrder.verify(observer2, times(1)).onNext(2);
inOrder.verify(observer2, never()).onNext(3);
inOrder.verify(observer2, times(1)).onNext(4);
inOrder.verify(observer2, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerTerminatingEarly() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
source.onNext(2);
sampler.onNext(1);
sampler.onCompleted();

source.onNext(3);
source.onNext(4);



InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, never()).onNext(1);
inOrder.verify(observer2, times(1)).onNext(2);
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verify(observer2, never()).onNext(any());
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerEmitAndTerminate() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
source.onNext(2);
sampler.onNext(1);
source.onNext(3);
source.onCompleted();
sampler.onNext(2);
sampler.onCompleted();

InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, never()).onNext(1);
inOrder.verify(observer2, times(1)).onNext(2);
inOrder.verify(observer2, never()).onNext(3);
inOrder.verify(observer2, times(1)).onCompleted();
inOrder.verify(observer2, never()).onNext(any());
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerEmptySource() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onCompleted();
sampler.onNext(1);

InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onNext(any());
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void sampleWithSamplerSourceThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
source.onError(new RuntimeException("Forced failure!"));
sampler.onNext(1);

InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onError(any(Throwable.class));
verify(observer2, never()).onNext(any());
verify(observer, never()).onCompleted();
}
@Test
public void sampleWithSamplerThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> sampler = PublishSubject.create();

Observable<Integer> m = source.sample(sampler);
m.subscribe(observer2);

source.onNext(1);
sampler.onNext(1);
sampler.onError(new RuntimeException("Forced failure!"));

InOrder inOrder = inOrder(observer2);
inOrder.verify(observer2, times(1)).onNext(1);
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
verify(observer, never()).onCompleted();
}
}

0 comments on commit 53787f2

Please sign in to comment.