Skip to content

Commit

Permalink
Implemented Observable.x(ConversionFunc) to allow external extensions…
Browse files Browse the repository at this point in the history
… to Observables.
  • Loading branch information
Aaron Tull committed Jul 16, 2015
1 parent 96786bb commit 62345af
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 7 deletions.
19 changes: 19 additions & 0 deletions src/main/java/rx/ConversionFunc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package rx;

import rx.Observable.OnSubscribe;

/**
* Converts the values emitted by an Observable's OnSubscribe function to a value.
*
* @param <T> the type of values to be consumed
* @param <R> the return type
*/
public interface ConversionFunc<T, R> {
/**
* Converts the data produced by the provided {@code OnSubscribe function} to a value.
*
* @param onSubscribe a function that produces data to a Subscriber, usually wrapped by an Observable.
* @return an instance of {@code R}
*/
public R convert(OnSubscribe<T> onSubscribe);
}
46 changes: 39 additions & 7 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
// cover for generics insanity
}

/**
* Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be
* collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to
* return an Observable (enabling chaining).
*
* @param conversion a function that converts from this {@code Observable<T>} to an {@code R}
* @return an instance of R created by the provided Conversion
*/
public <R> R x(ConversionFunc<T, R> conversion) {
final Observable<T> self = this;
return conversion.convert(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.add(Observable.subscribe(subscriber, self));
}});
}

/**
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
* the values of the current Observable through the Operator function.
Expand All @@ -133,11 +150,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
return new Observable<R>(new OnSubscribe<R>() {
return new Observable<R>(lift(lift, onSubscribe));
}

/**
* Wraps the given OnSubscribe in an OnSubscribe that applies the provided operator implementation. The subscription
* of the provided {@code onSubscribe} will be deferred until the returned OnSubscribe is subscribed.
*
* @param operator
* @param onSubscribe the source OnSubscribe function
* @return an OnSubscribe that delegates the emitted values from {@code onSubscribe} to the {@code operator}.
*/
/*package*/ static <R, T> OnSubscribe<R> lift(final Operator<? extends R, ? super T> operator, final OnSubscribe<T> onSubscribe) {
return new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
Expand All @@ -160,10 +189,9 @@ public void call(Subscriber<? super R> o) {
o.onError(e);
}
}
});
};
}


/**
* Transform an Observable by applying a particular Transformer function to it.
* <p>
Expand Down Expand Up @@ -7737,11 +7765,15 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// validate and proceed
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribe == null) {
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
Expand All @@ -7765,7 +7797,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/rx/OperatorConversion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rx;

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;

public class OperatorConversion<T, R> implements ConversionFunc<T, Observable<R>> {

private Operator<? extends R, ? super T> operator;

public OperatorConversion(Operator<? extends R, ? super T> operator) {
this.operator = operator;
}

@Override
public Observable<R> convert(OnSubscribe<T> onSubscribe) {
return Observable.create(wrapSubscriber(onSubscribe));
}

protected OnSubscribe<R> wrapSubscriber(final OnSubscribe<T> onSubscribe) {
return Observable.lift(operator, onSubscribe);
}

}
53 changes: 53 additions & 0 deletions src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
Expand All @@ -46,13 +47,16 @@
import org.mockito.MockitoAnnotations;

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observable.Transformer;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OperatorAll;
import rx.internal.operators.OperatorMap;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
Expand Down Expand Up @@ -1138,4 +1142,53 @@ public void testSubscribingSubscriberAsObserverMaintainsSubscriptionChain() {

subscriber.assertUnsubscribed();
}

@Test
public void testExtend() {
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
final Object value = new Object();
Observable.just(value).x(new ConversionFunc<Object,Object>(){
@Override
public Object convert(OnSubscribe<Object> onSubscribe) {
onSubscribe.call(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value);
return subscriber.getOnNextEvents().get(0);
}});
}

@Test
public void testOperatorConversion() {
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
final Object value = new Object();
Observable.just(value).x(new OperatorConversion<Object,Object>(new Operator<Object, Object>(){
@Override
public Subscriber<? super Object> call(Subscriber<? super Object> t) {
return t;
}})).subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value);
}

@Test
public void testOperatorConversionWithMap() {
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
final AtomicLong value = new AtomicLong();
OperatorConversion<Object, Long> toTimeReceived = new OperatorConversion<Object,Long>(
new OperatorMap<Object, Long>(new Func1<Object, Long>(){
@Override
public Long call(Object t) {
long time = System.currentTimeMillis();
value.set(time);
return time;
}}));
Observable.just(new Object())
.x(toTimeReceived)
.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value.get());
}
}

0 comments on commit 62345af

Please sign in to comment.