Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented SerialSubscription and Timeout operator #434

Merged
merged 4 commits into from
Oct 22, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeout;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1855,8 +1856,6 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
*
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
Expand All @@ -1876,8 +1875,6 @@ public Observable<T> synchronize() {
*
* @param lock
* The lock object to synchronize each observer call on
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
Expand Down Expand Up @@ -3194,7 +3191,7 @@ public Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
/**
* Determines whether an observable sequence contains a specified element.
*
* @param value
* @param element
* The element to search in the sequence.
* @return an Observable that emits if the element is in the source sequence.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228965(v=vs.103).aspx">MSDN: Observable.Contains</a>
Expand Down Expand Up @@ -4507,6 +4504,32 @@ public Observable<T> ignoreElements() {
return filter(alwaysFalse());
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
* @param timeout
* The timeout duration
* @param timeUnit
* The time unit of the timeout
* @param scheduler
* The scheduler to run the timeout timers on.
* @return The source sequence with a TimeoutException in case of a timeout.
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
* @param timeout
* The timeout duration
* @param timeUnit
* The time unit of the timeout
* @return The source sequence with a TimeoutException in case of a timeout.
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
128 changes: 128 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public final class OperationTimeout {
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, scheduler);
}

private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final long timeout;
private final TimeUnit timeUnit;
private final Scheduler scheduler;

private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
this.source = source;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final AtomicBoolean terminated = new AtomicBoolean(false);
final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout
final SerialSubscription serial = new SerialSubscription();
final Object gate = new Object();
CompositeSubscription composite = new CompositeSubscription();
final Func0<Subscription> schedule = new Func0<Subscription>() {
@Override
public Subscription call() {
final long expected = actual.get();
return scheduler.schedule(new Action0() {
@Override
public void call() {
boolean timeoutWins = false;
synchronized (gate) {
if (expected == actual.get() && !terminated.getAndSet(true)) {
timeoutWins = true;
}
}
if (timeoutWins) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the use of compareAndSet patterns simplify this code to not need these win/loss checks?

The nice thing about compareAndSet is it gives that in a very idiomatic manner.

Reason I suggest it is that this code flow is non-trivial to read and understand with the synchronization gates and conditional flows that all need to be just right in order to work.

Here is an example where I use compareAndSet for state changes to achieve timeout in Hystrix: https://github.com/Netflix/Hystrix/blob/master/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java#L1015

An AtomicReference is used to atomically switch between the 3 possible states:

private static enum TimedOutStatus {NOT_EXECUTED, COMPLETED, TIMED_OUT};
private AtomicReference<TimedOutStatus> isTimedOut = new AtomicReference<TimedOutStatus>();

The successful flow would attempt this when receiving onNext:

// if we can go from NOT_EXECUTED to COMPLETED then we did not timeout
if (isTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {

} else {
    // this means we lost the race and the timeout logic has or is being executed

}

The timeout flow would attempt this if the scheduler triggers it:

// if we can go from NOT_EXECUTED to TIMED_OUT then we have not yet completed and should perform timeout logic
if (isTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {

} else {
    // this means we lost the race and the work completed before timeout

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that you have to handle the case where the timeout timer fires but onNext beats it into the synchronized section. As onNext does not mutate the timeout status, there would be nothing to stop the timeout thread from onErroring. The effect of this is you would get an onNext immediately followed by an onError.

The way to mitigate this is to have onNext increment a counter. As you can't cover two condition checks in a single, atomic compareAndSet you have to fall back to explicit synchronization.

The Rx team use a similar mechanism to handle this case.

observer.onError(new TimeoutException());
}

}
}, timeout, timeUnit);
}
};
SafeObservableSubscription subscription = new SafeObservableSubscription();
composite.add(subscription.wrap(source.subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
boolean onNextWins = false;
synchronized (gate) {
if (!terminated.get()) {
actual.incrementAndGet();
onNextWins = true;
}
}
if (onNextWins) {
serial.setSubscription(schedule.call());
observer.onNext(value);
}
}

@Override
public void onError(Throwable error) {
boolean onErrorWins = false;
synchronized (gate) {
if (!terminated.getAndSet(true)) {
onErrorWins = true;
}
}
if (onErrorWins) {
serial.unsubscribe();
observer.onError(error);
}
}

@Override
public void onCompleted() {
boolean onCompletedWins = false;
synchronized (gate) {
if (!terminated.getAndSet(true)) {
onCompletedWins = true;
}
}
if (onCompletedWins) {
serial.unsubscribe();
observer.onCompleted();
}
}
})));
composite.add(serial);
serial.setSubscription(schedule.call());
return composite;
}
}
}
70 changes: 70 additions & 0 deletions rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import rx.Subscription;

/**
* Represents a subscription whose underlying subscription can be swapped for another subscription
* which causes the previous underlying subscription to be unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
*/
public class SerialSubscription implements Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class is achieving the same as MultipleAssingmentSubscription. Can you take a look at it?

https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java

No idea why I wrote such a horrible Javadoc on it that doesn't explain it well. From the MSDN doc it states: "Represents a disposable whose underlying disposable can be swapped for another disposable."

Your Javadoc description should replace the MultipleAssingmentSubscription one if I'm reading the code correctly and they do the same things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben,

SerialSubscription and MultipleAssignmentSubscription are different things. MultipleAssignment simply enables you to swap out the underlying subscription in a thread-safe manner. Whereas, Serial unsubscribes from the previous underlying as you replace.

See:
http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx
Vs:
http://msdn.microsoft.com/en-us/library/system.reactive.disposables.multipleassignmentdisposable(v=vs.103).aspx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, cool.

private boolean unsubscribed;
private Subscription subscription;
private final Object gate = new Object();

@Override
public void unsubscribe() {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (subscription != null) {
toUnsubscribe = subscription;
subscription = null;
}
unsubscribed = true;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}

public Subscription getSubscription() {
synchronized (gate) {
return subscription;
}
}

public void setSubscription(Subscription subscription) {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (this.subscription != null) {
toUnsubscribe = this.subscription;
}
this.subscription = subscription;
} else {
toUnsubscribe = subscription;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}
}
117 changes: 117 additions & 0 deletions rxjava-core/src/test/java/rx/TimeoutTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public class TimeoutTests {
private PublishSubject<String> underlyingSubject;
private TestScheduler testScheduler;
private Observable<String> withTimeout;
private static final long TIMEOUT = 3;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

underlyingSubject = PublishSubject.create();
testScheduler = new TestScheduler();
withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler);
}

@Test
public void shouldNotTimeoutIfOnNextWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
verify(observer).onNext("One");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");
verify(observer).onNext("Two");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldTimeoutIfOnNextNotWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
verify(observer).onError(any(TimeoutException.class));
subscription.unsubscribe();
}

@Test
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
verify(observer).onNext("One");
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
verify(observer).onError(any(TimeoutException.class));
subscription.unsubscribe();
}

@Test
public void shouldCompleteIfUnderlyingComletes() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onCompleted();
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldErrorIfUnderlyingErrors() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onError(new UnsupportedOperationException());
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer).onError(any(UnsupportedOperationException.class));
subscription.unsubscribe();
}
}
Loading