Skip to content

Commit

Permalink
Merge pull request #3614 from akarnokd/JustBackpressure1xV2
Browse files Browse the repository at this point in the history
1.x: just() now supports backpressure (+ related fixes/changes)
  • Loading branch information
akarnokd committed Jan 26, 2016
2 parents 5ab00f9 + c925e86 commit 503d369
Show file tree
Hide file tree
Showing 8 changed files with 641 additions and 197 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8330,7 +8330,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
return create(new OperatorSubscribeOn<T>(this, scheduler));
}

/**
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1736,8 +1736,43 @@ public void onNext(T t) {
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
public final Single<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
public final Single<T> subscribeOn(final Scheduler scheduler) {
return create(new OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> t) {
final Scheduler.Worker w = scheduler.createWorker();
t.add(w);

w.schedule(new Action0() {
@Override
public void call() {
SingleSubscriber<T> ssub = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
try {
t.onSuccess(value);
} finally {
w.unsubscribe();
}
}

@Override
public void onError(Throwable error) {
try {
t.onError(error);
} finally {
w.unsubscribe();
}
}
};

t.add(ssub);

Single.this.subscribe(ssub);
}
});
}
});
}

/**
Expand Down
124 changes: 56 additions & 68 deletions src/main/java/rx/internal/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,96 +15,84 @@
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.functions.Action0;

/**
* Subscribes Observers on the specified {@code Scheduler}.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
*
* @param <T> the value type of the actual source
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

private final Scheduler scheduler;
final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Scheduler scheduler) {
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}


inner.schedule(new Action0() {
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
}

});
});
}
}

});
}
});
};

source.unsafeSubscribe(s);
}

};
});
}
}
}
Loading

0 comments on commit 503d369

Please sign in to comment.