Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Porting the Scheduler.when operator from 1.x to 2.x #4827

Merged
merged 1 commit into from
Nov 10, 2016

Conversation

abersnaze
Copy link
Contributor

@abersnaze abersnaze commented Nov 9, 2016

In fixing the Scheduler.when in 1.x I noticed that it hadn't been ported to 2.x. This PR tries to fix that translating Observable to Flowable and Subscription to Disposable. This also includes the fix from 1.x

final Worker actualWorker = actualScheduler.createWorker();
// a queue for the actions submitted while worker is waiting to get to
// the subscribe to off the workerQueue.
ReplaySubject<ScheduledAction> actionSubject = ReplaySubject.create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a ReplayProcessor instead.

Flowable<Completable> actions = actionSubject.toFlowable(BackpressureStrategy.BUFFER).map(new Function<ScheduledAction, Completable>() {
@Override
public Completable apply(final ScheduledAction action) {
return Completable.unsafeCreate(new CompletableSource() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for such indirection, just extend Completable:

return new Completable() {
  @Override
  public void subscribeActual(CompletableObserver actionCompletable) {
    actionCompletable.onSubscribe(action);
    action.call(actualWorker, actionCompletable);
   }
};

@akarnokd akarnokd added this to the 2.0 backlog milestone Nov 9, 2016
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.openjdk.jmh.runner.RunnerException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not available in the main build.
https://travis-ci.org/ReactiveX/RxJava/builds/174626634#L169

@codecov-io
Copy link

Current coverage is 95.66% (diff: 76.31%)

Merging #4827 into 2.x will decrease coverage by 0.03%

@@                2.x      #4827   diff @@
==========================================
  Files           570        571     +1   
  Lines         36723      36799    +76   
  Methods           0          0          
  Messages          0          0          
  Branches       5556       5563     +7   
==========================================
+ Hits          35143      35202    +59   
- Misses          651        663    +12   
- Partials        929        934     +5   

Powered by Codecov. Last update ba6f392...ad5ab3a

@akarnokd akarnokd merged commit c8303e1 into ReactiveX:2.x Nov 10, 2016
@abersnaze abersnaze deleted the schedule-when-2x branch November 10, 2016 16:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants