Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation throttleLast #365

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleLast;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1809,6 +1810,36 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
return create(OperationInterval.interval(interval, unit, scheduler));
}

/**
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
*
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
*/
public Observable<T> throttleLast(long timeout, TimeUnit unit) {
return create(OperationThrottleLast.throttleLast(this, timeout, unit));
}

/**
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
* @param scheduler
* The {@link Scheduler} to use when timing incoming values.
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
*/
public Observable<T> throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler));
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
* <p>
Expand Down
27 changes: 22 additions & 5 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {

private final long time;
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
private final T state;
private final TestScheduler scheduler;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
this.time = time;
Expand All @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
this.scheduler = scheduler;
}

public void cancel() {
isCancelled.set(true);
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
Expand Down Expand Up @@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
}
time = current.time;
queue.remove();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);

// Only execute if the TimedAction has not yet been cancelled
if (!current.isCancelled.get()) {
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}
time = targetTimeInNanos;
}
Expand All @@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
queue.add(timedAction);

return new Subscription() {
@Override
public void unsubscribe() {
timedAction.cancel();
}
};
}
}
188 changes: 188 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationThrottleLast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
* as soon as the timeout expires.
*/
public final class OperationThrottleLast {

/**
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
* the last received event is published.
*
* @param items
* The {@link Observable} which is publishing events.
* @param timeout
* How long each event has to be the 'last event' before it gets published.
* @param unit
* The unit of time for the specified timeout.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleLast(Observable<T> items, long timeout, TimeUnit unit) {
return throttleLast(items, timeout, unit, Schedulers.threadPoolForComputation());
}

/**
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
* the last received event is published.
*
* @param items
* The {@link Observable} which is publishing events.
* @param timeout
* How long each event has to be the 'last event' before it gets published.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleLast(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return items.window(timeout, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {

@Override
public Observable<T> call(Observable<T> o) {
return o.takeLast(1);
}
}).subscribe(observer);
}
};
}

public static class UnitTest {

private TestScheduler scheduler;
private Observer<String> observer;

@Before
@SuppressWarnings("unchecked")
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
}

@Test
public void testThrottlingWithCompleted() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires.
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testThrottlingWithError() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
Exception error = new TestException();
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
publishError(observer, 700, error); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onNext("one");
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
}

private <T> void publishCompleted(final Observer<T> observer, long delay) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onCompleted();
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onError(error);
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onNext(value);
}
}, delay, TimeUnit.MILLISECONDS);
}

@SuppressWarnings("serial")
private class TestException extends Exception {
}

}

}
46 changes: 46 additions & 0 deletions rxjava-core/src/test/java/rx/ThrottleLastTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package rx;

import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.mockito.InOrder;

import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;

public class ThrottleLastTests {

@Test
public void testThrottle() {
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);

// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();

InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onNext(2);
inOrder.verify(observer).onNext(6);
inOrder.verify(observer).onNext(7);
inOrder.verify(observer).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}