Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: WIP removes anonymous inner classes. #5174

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,44 +78,46 @@ public void onNext(T args) {
* thread expect {@link Iterator#next()} called from a different thread to work.
* @return the Iterator
*/
public Iterator<T> getIterable() {
return new Iterator<T>() {
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
private Object buf;

@Override
public boolean hasNext() {
buf = value;
return !NotificationLite.isComplete(buf);
}
public Iterator getIterable() {
return new Iterator();
}

@Override
public T next() {
try {
// if hasNext wasn't called before calling next.
if (buf == null) {
buf = value;
}
if (NotificationLite.isComplete(buf)) {
throw new NoSuchElementException();
}
if (NotificationLite.isError(buf)) {
throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf));
}
return NotificationLite.getValue(buf);
final class Iterator implements java.util.Iterator<T> {
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
private Object buf;

@Override
public boolean hasNext() {
buf = value;
return !NotificationLite.isComplete(buf);
}

@Override
public T next() {
try {
// if hasNext wasn't called before calling next.
if (buf == null) {
buf = value;
}
if (NotificationLite.isComplete(buf)) {
throw new NoSuchElementException();
}
finally {
buf = null;
if (NotificationLite.isError(buf)) {
throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf));
}
return NotificationLite.getValue(buf);
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
finally {
buf = null;
}
};
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,7 @@ public void onSubscribe(Subscription s) {

w.schedulePeriodically(this, timeskip, timeskip, unit);

w.schedule(new Runnable() {
@Override
public void run() {
synchronized (BufferSkipBoundedSubscriber.this) {
buffers.remove(b);
}

fastPathOrderedEmitMax(b, false, w);
}
}, timespan, unit);
w.schedule(new RemoveFromBuffer(b), timespan, unit);
}

@Override
Expand Down Expand Up @@ -367,23 +358,31 @@ public void run() {
buffers.add(b);
}

w.schedule(new Runnable() {
@Override
public void run() {
synchronized (BufferSkipBoundedSubscriber.this) {
buffers.remove(b);
}

fastPathOrderedEmitMax(b, false, w);
}
}, timespan, unit);
w.schedule(new RemoveFromBuffer(b), timespan, unit);
}

@Override
public boolean accept(Subscriber<? super U> a, U v) {
a.onNext(v);
return true;
}

final class RemoveFromBuffer implements Runnable {
private final U buffer;

RemoveFromBuffer(U buffer) {
this.buffer = buffer;
}

@Override
public void run() {
synchronized (BufferSkipBoundedSubscriber.this) {
buffers.remove(buffer);
}

fastPathOrderedEmitMax(buffer, false, w);
}
}
}

static final class BufferExactBoundedSubscriber<T, U extends Collection<? super T>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ public void subscribeActual(Subscriber<? super R> s) {
return;
}
if (n == 1) {
((Publisher<T>)a[0]).subscribe(new MapSubscriber<T, R>(s, new Function<T, R>() {
@Override
public R apply(T t) throws Exception {
return combiner.apply(new Object[] { t });
}
}));
((Publisher<T>)a[0]).subscribe(new MapSubscriber<T, R>(s, new SingletonArrayFunc()));
return;
}

Expand Down Expand Up @@ -557,4 +552,11 @@ public void requestOne() {

}
}

final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return combiner.apply(new Object[] { t });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,40 +78,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(final T t) {
w.schedule(new Runnable() {
@Override
public void run() {
actual.onNext(t);
}
}, delay, unit);
w.schedule(new OnNext(t), delay, unit);
}

@Override
public void onError(final Throwable t) {
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
}
}, delayError ? delay : 0, unit);
w.schedule(new OnError(t), delayError ? delay : 0, unit);
}

@Override
public void onComplete() {
w.schedule(new Runnable() {
@Override
public void run() {
try {
actual.onComplete();
} finally {
w.dispose();
}
}
}, delay, unit);
w.schedule(new OnComplete(), delay, unit);
}

@Override
Expand All @@ -125,5 +102,45 @@ public void cancel() {
w.dispose();
}

final class OnNext implements Runnable {
private final T t;

OnNext(T t) {
this.t = t;
}

@Override
public void run() {
actual.onNext(t);
}
}

private class OnError implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No private but final

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too.

private final Throwable t;

public OnError(Throwable t) {
this.t = t;
}

@Override
public void run() {
try {
actual.onError(t);
} finally {
w.dispose();
}
}
}

private class OnComplete implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no private but final

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too.

@Override
public void run() {
try {
actual.onComplete();
} finally {
w.dispose();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,71 +38,90 @@ public void subscribeActual(final Subscriber<? super T> child) {
final SubscriptionArbiter serial = new SubscriptionArbiter();
child.onSubscribe(serial);

FlowableSubscriber<U> otherSubscriber = new FlowableSubscriber<U>() {
boolean done;
FlowableSubscriber<U> otherSubscriber = new DelaySubscriber(serial, child);

other.subscribe(otherSubscriber);
}

private final class DelaySubscriber implements FlowableSubscriber<U> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no private

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still private

private final SubscriptionArbiter serial;
private final Subscriber<? super T> child;
boolean done;

DelaySubscriber(SubscriptionArbiter serial, Subscriber<? super T> child) {
this.serial = serial;
this.child = child;
}

@Override
public void onSubscribe(final Subscription s) {
serial.setSubscription(new DelaySubscription(s));
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(U t) {
onComplete();
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.onError(e);
return;
}
done = true;
child.onError(e);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;

main.subscribe(new OnCompleteSubscriber());
}

private class DelaySubscription implements Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no private but final

private final Subscription s;

public DelaySubscription(Subscription s) {
this.s = s;
}

@Override
public void onSubscribe(final Subscription s) {
serial.setSubscription(new Subscription() {
@Override
public void request(long n) {
// ignored
}

@Override
public void cancel() {
s.cancel();
}
});
s.request(Long.MAX_VALUE);
public void request(long n) {
// ignored
}

@Override
public void onNext(U t) {
onComplete();
public void cancel() {
s.cancel();
}
}

final class OnCompleteSubscriber implements FlowableSubscriber<T> {
@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.onError(e);
return;
}
done = true;
child.onError(e);
public void onSubscribe(Subscription s) {
serial.setSubscription(s);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;

main.subscribe(new FlowableSubscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
serial.setSubscription(s);
}

@Override
public void onNext(T t) {
child.onNext(t);
}

@Override
public void onError(Throwable t) {
child.onError(t);
}

@Override
public void onComplete() {
child.onComplete();
}
});
public void onNext(T t) {
child.onNext(t);
}
};

other.subscribe(otherSubscriber);
@Override
public void onError(Throwable t) {
child.onError(t);
}

@Override
public void onComplete() {
child.onComplete();
}
}
}
}
Loading