-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: WIP removes anonymous inner classes. #5174
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,40 +78,17 @@ public void onSubscribe(Subscription s) { | |
|
||
@Override | ||
public void onNext(final T t) { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
actual.onNext(t); | ||
} | ||
}, delay, unit); | ||
w.schedule(new OnNext(t), delay, unit); | ||
} | ||
|
||
@Override | ||
public void onError(final Throwable t) { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
actual.onError(t); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
}, delayError ? delay : 0, unit); | ||
w.schedule(new OnError(t), delayError ? delay : 0, unit); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
actual.onComplete(); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
}, delay, unit); | ||
w.schedule(new OnComplete(), delay, unit); | ||
} | ||
|
||
@Override | ||
|
@@ -125,5 +102,45 @@ public void cancel() { | |
w.dispose(); | ||
} | ||
|
||
final class OnNext implements Runnable { | ||
private final T t; | ||
|
||
OnNext(T t) { | ||
this.t = t; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
actual.onNext(t); | ||
} | ||
} | ||
|
||
private class OnError implements Runnable { | ||
private final Throwable t; | ||
|
||
public OnError(Throwable t) { | ||
this.t = t; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
actual.onError(t); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
} | ||
|
||
private class OnComplete implements Runnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private but final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here too. |
||
@Override | ||
public void run() { | ||
try { | ||
actual.onComplete(); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,71 +38,90 @@ public void subscribeActual(final Subscriber<? super T> child) { | |
final SubscriptionArbiter serial = new SubscriptionArbiter(); | ||
child.onSubscribe(serial); | ||
|
||
FlowableSubscriber<U> otherSubscriber = new FlowableSubscriber<U>() { | ||
boolean done; | ||
FlowableSubscriber<U> otherSubscriber = new DelaySubscriber(serial, child); | ||
|
||
other.subscribe(otherSubscriber); | ||
} | ||
|
||
private final class DelaySubscriber implements FlowableSubscriber<U> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still private |
||
private final SubscriptionArbiter serial; | ||
private final Subscriber<? super T> child; | ||
boolean done; | ||
|
||
DelaySubscriber(SubscriptionArbiter serial, Subscriber<? super T> child) { | ||
this.serial = serial; | ||
this.child = child; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
serial.setSubscription(new DelaySubscription(s)); | ||
s.request(Long.MAX_VALUE); | ||
} | ||
|
||
@Override | ||
public void onNext(U t) { | ||
onComplete(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
if (done) { | ||
RxJavaPlugins.onError(e); | ||
return; | ||
} | ||
done = true; | ||
child.onError(e); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
if (done) { | ||
return; | ||
} | ||
done = true; | ||
|
||
main.subscribe(new OnCompleteSubscriber()); | ||
} | ||
|
||
private class DelaySubscription implements Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private but final |
||
private final Subscription s; | ||
|
||
public DelaySubscription(Subscription s) { | ||
this.s = s; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
serial.setSubscription(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
// ignored | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
s.cancel(); | ||
} | ||
}); | ||
s.request(Long.MAX_VALUE); | ||
public void request(long n) { | ||
// ignored | ||
} | ||
|
||
@Override | ||
public void onNext(U t) { | ||
onComplete(); | ||
public void cancel() { | ||
s.cancel(); | ||
} | ||
} | ||
|
||
final class OnCompleteSubscriber implements FlowableSubscriber<T> { | ||
@Override | ||
public void onError(Throwable e) { | ||
if (done) { | ||
RxJavaPlugins.onError(e); | ||
return; | ||
} | ||
done = true; | ||
child.onError(e); | ||
public void onSubscribe(Subscription s) { | ||
serial.setSubscription(s); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
if (done) { | ||
return; | ||
} | ||
done = true; | ||
|
||
main.subscribe(new FlowableSubscriber<T>() { | ||
@Override | ||
public void onSubscribe(Subscription s) { | ||
serial.setSubscription(s); | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
child.onNext(t); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
child.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
child.onComplete(); | ||
} | ||
}); | ||
public void onNext(T t) { | ||
child.onNext(t); | ||
} | ||
}; | ||
|
||
other.subscribe(otherSubscriber); | ||
@Override | ||
public void onError(Throwable t) { | ||
child.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
child.onComplete(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No private but final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too.