From 2ea065c0ef22ea7cf58e9fb6d6f24c69f365bed6 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 10:49:23 +0200 Subject: [PATCH 1/5] Created and wired an implementation for the throttle operation on Observables. --- rxjava-core/src/main/java/rx/Observable.java | 37 ++- .../java/rx/concurrency/TestScheduler.java | 27 +- .../java/rx/operators/OperationThrottle.java | 295 ++++++++++++++++++ 3 files changed, 351 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottle.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a7d8ffe55f..d52dcb5710 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,9 +15,16 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +72,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottle; import rx.operators.OperationTimestamp; import rx.operators.OperationToFuture; import rx.operators.OperationToIterator; @@ -2095,6 +2103,29 @@ public Boolean call(T t, Integer integer) })); } + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit The {@link TimeUnit} for the timeout. + * @return An {@link Observable} which filters out values which are too quickly followed up with never values. + */ + public Observable throttle(long timeout, TimeUnit unit) { + return create(OperationThrottle.throttle(this, timeout, unit)); + } + + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit The {@link TimeUnit} for the timeout. + * @param scheduler The {@link Scheduler} to use when timing incoming values. + * @return An {@link Observable} which filters out values which are too quickly followed up with never values. + */ + public Observable throttle(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottle.throttle(this, timeout, unit, scheduler)); + } + /** * Adds a timestamp to each item emitted by this observable. * @return An observable sequence of timestamped items. diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index a10ab90e21..1e56dbfcf6 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -19,20 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); private static class TimedAction { + private final long time; private final Func2 action; private final T state; private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { this.time = time; @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2) current.action).call(current.scheduler, current.state); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } } } @@ -97,7 +107,14 @@ public Subscription schedule(T state, Func2 acti @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); - return Subscriptions.empty(); + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java new file mode 100644 index 0000000000..05286f0cb4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -0,0 +1,295 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + + +/** + * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too + * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published + * as soon as the timeout expires. + */ +public final class OperationThrottle { + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items The {@link Observable} which is publishing events. + * @param timeout How long each event has to be the 'last event' before it gets published. + * @param unit The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static Func1, Subscription> throttle(final Observable items, long timeout, TimeUnit unit) { + return throttle(items, timeout, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items The {@link Observable} which is publishing events. + * @param timeout How long each event has to be the 'last event' before it gets published. + * @param unit The unit of time for the specified timeout. + * @param scheduler The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static Func1, Subscription> throttle(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { + return new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + return new Throttle(items, timeout, unit, scheduler).call(observer); + } + }; + } + + private static class Throttle implements Func1, Subscription> { + + private final Observable items; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { + this.items = items; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription call(Observer observer) { + return items.subscribe(new ThrottledObserver(observer, timeout, unit, scheduler)); + } + } + + private static class ThrottledObserver implements Observer { + + private final Observer observer; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + private final AtomicLong waitUntil = new AtomicLong(); + private final AtomicReference subscription = new AtomicReference(Subscriptions.empty()); + + public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { + this.observer = observer; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + throttle(new Notification()); + } + + @Override + public void onError(Exception e) { + throttle(new Notification(e)); + } + + @Override + public void onNext(final T args) { + throttle(new Notification(args)); + } + + private void throttle(final Notification args) { + synchronized (subscription) { + if (!timerHasExpired()) { + subscription.get().unsubscribe(); + } + subscription.set(scheduler.schedule(new ThrottleAction(observer, args), timeout, unit)); + } + } + + private boolean timerHasExpired() { + long now = scheduler.now(); + long nextTimeout = now + unit.toMillis(timeout); + long previousTimeout = waitUntil.getAndSet(nextTimeout); + return previousTimeout <= now; + } + } + + private static final class ThrottleAction implements Action0 { + + private final Observer observer; + private final Notification notification; + + public ThrottleAction(Observer observer, Notification notification) { + this.observer = observer; + this.notification = notification; + } + + @Override + public void call() { + if (notification.isOnNext()) { + observer.onNext(notification.getValue()); + } + else if (notification.isOnError()) { + observer.onError(notification.getException()); + } + else if (notification.isOnCompleted()) { + observer.onCompleted(); + } + } + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testThrottlingWithCompleted() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observser) { + publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observser, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(700, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(900, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1600, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + + @Test + public void testThrottlingWithError() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observser) { + Exception error = new TestException(); + publishNext(observser, 100, "one"); // Should be published since "two" will arrive after the timeout expires. + publishNext(observser, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. + publishError(observser, 700, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(600, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1200, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, long delay, final String value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { } + + } + +} From 622d861efea555e98600f165f6521afd7f4084c0 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 10:54:33 +0200 Subject: [PATCH 2/5] Ensure static star imports are used for test cases. --- rxjava-core/src/main/java/rx/Observable.java | 13 +++---------- .../main/java/rx/operators/OperationThrottle.java | 9 ++------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d52dcb5710..35512dcf7a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,16 +15,9 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 05286f0cb4..c53fc14864 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,13 +15,8 @@ */ package rx.operators; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; From 77ac15bb78fd9bc4f698edd44bbd88a55907a2b8 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 11:23:24 +0200 Subject: [PATCH 3/5] No longer using Notification for scheduling throttled events. --- .../java/rx/operators/OperationThrottle.java | 68 +++++++++++++------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index c53fc14864..5badf0858c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,8 +15,13 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -27,7 +32,6 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.Notification; import rx.Observable; import rx.Observer; import rx.Scheduler; @@ -119,25 +123,25 @@ public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Sche @Override public void onCompleted() { - throttle(new Notification()); + throttle(new ThrottledOnComplete(observer)); } @Override public void onError(Exception e) { - throttle(new Notification(e)); + throttle(new ThrottledOnError(observer, e)); } @Override public void onNext(final T args) { - throttle(new Notification(args)); + throttle(new ThrottledOnNext(observer, args)); } - private void throttle(final Notification args) { + private void throttle(Action0 action) { synchronized (subscription) { if (!timerHasExpired()) { subscription.get().unsubscribe(); } - subscription.set(scheduler.schedule(new ThrottleAction(observer, args), timeout, unit)); + subscription.set(scheduler.schedule(action, timeout, unit)); } } @@ -149,27 +153,49 @@ private boolean timerHasExpired() { } } - private static final class ThrottleAction implements Action0 { + private static final class ThrottledOnNext implements Action0 { private final Observer observer; - private final Notification notification; + private final T value; - public ThrottleAction(Observer observer, Notification notification) { + public ThrottledOnNext(Observer observer, T value) { this.observer = observer; - this.notification = notification; + this.value = value; } @Override public void call() { - if (notification.isOnNext()) { - observer.onNext(notification.getValue()); - } - else if (notification.isOnError()) { - observer.onError(notification.getException()); - } - else if (notification.isOnCompleted()) { - observer.onCompleted(); - } + observer.onNext(value); + } + } + + private static final class ThrottledOnError implements Action0 { + + private final Observer observer; + private final Exception error; + + public ThrottledOnError(Observer observer, Exception error) { + this.observer = observer; + this.error = error; + } + + @Override + public void call() { + observer.onError(error); + } + } + + private static final class ThrottledOnComplete implements Action0 { + + private final Observer observer; + + public ThrottledOnComplete(Observer observer) { + this.observer = observer; + } + + @Override + public void call() { + observer.onCompleted(); } } From 02ee6fae45e5b6c2c90af88f6479fb130c87cbe7 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 00:48:53 +0200 Subject: [PATCH 4/5] Cleaned up imports and removed unnecessary final keywords in the OperationThrottle class. --- .../java/rx/operators/OperationThrottle.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 5badf0858c..282347a38e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,13 +15,8 @@ */ package rx.operators; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -60,7 +55,7 @@ public final class OperationThrottle { * @param unit The unit of time for the specified timeout. * @return A {@link Func1} which performs the throttle operation. */ - public static Func1, Subscription> throttle(final Observable items, long timeout, TimeUnit unit) { + public static Func1, Subscription> throttle(Observable items, long timeout, TimeUnit unit) { return throttle(items, timeout, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } @@ -132,7 +127,7 @@ public void onError(Exception e) { } @Override - public void onNext(final T args) { + public void onNext(T args) { throttle(new ThrottledOnNext(observer, args)); } @@ -215,7 +210,7 @@ public void before() { public void testThrottlingWithCompleted() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(final Observer observser) { + public Subscription call(Observer observser) { publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. @@ -250,7 +245,7 @@ public Subscription call(final Observer observser) { public void testThrottlingWithError() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(final Observer observser) { + public Subscription call(Observer observser) { Exception error = new TestException(); publishNext(observser, 100, "one"); // Should be published since "two" will arrive after the timeout expires. publishNext(observser, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. @@ -281,7 +276,7 @@ public Subscription call(final Observer observser) { verify(observer, times(1)).onError(any(TestException.class)); } - private void publishCompleted(final Observer observer, long delay) { + private void publishCompleted(final Observer observer, long delay) { scheduler.schedule(new Action0() { @Override public void call() { @@ -290,7 +285,7 @@ public void call() { }, delay, TimeUnit.MILLISECONDS); } - private void publishError(final Observer observer, long delay, final Exception error) { + private void publishError(final Observer observer, long delay, final Exception error) { scheduler.schedule(new Action0() { @Override public void call() { @@ -299,7 +294,7 @@ public void call() { }, delay, TimeUnit.MILLISECONDS); } - private void publishNext(final Observer observer, long delay, final String value) { + private void publishNext(final Observer observer, long delay, final T value) { scheduler.schedule(new Action0() { @Override public void call() { From 2519ef8164d6e3405c40c3f187b5477188d142aa Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 01:29:33 +0200 Subject: [PATCH 5/5] Fixed a typo the UnitTest class of OperationThrottle. --- .../src/main/java/rx/operators/OperationThrottle.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 282347a38e..ebef4e7286 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -210,11 +210,11 @@ public void before() { public void testThrottlingWithCompleted() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(Observer observser) { - publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. - publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. - publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. - publishCompleted(observser, 1000); // Should be published as soon as the timeout expires. + public Subscription call(Observer observer) { + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. return Subscriptions.empty(); }