Skip to content

Commit

Permalink
2.x: fix Obs.combineLatestDelayError sync initial error not emitting (#…
Browse files Browse the repository at this point in the history
…5560)

* 2.x: fix Obs.combineLatestDelayError sync initial error not emitting

* Remove unused method.
  • Loading branch information
akarnokd authored Aug 22, 2017
1 parent 357fac2 commit ea7ca2c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.reactivex.internal.operators.observable;

import java.util.Arrays;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
Expand Down Expand Up @@ -79,8 +78,8 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp
final Observer<? super R> actual;
final Function<? super Object[], ? extends R> combiner;
final CombinerObserver<T, R>[] observers;
final T[] latest;
final SpscLinkedArrayQueue<Object> queue;
Object[] latest;
final SpscLinkedArrayQueue<Object[]> queue;
final boolean delayError;

volatile boolean cancelled;
Expand All @@ -99,18 +98,18 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp
this.actual = actual;
this.combiner = combiner;
this.delayError = delayError;
this.latest = (T[])new Object[count];
this.observers = new CombinerObserver[count];
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
this.latest = new Object[count];
CombinerObserver<T, R>[] as = new CombinerObserver[count];
for (int i = 0; i < count; i++) {
as[i] = new CombinerObserver<T, R>(this, i);
}
this.observers = as;
this.queue = new SpscLinkedArrayQueue<Object[]>(bufferSize);
}

public void subscribe(ObservableSource<? extends T>[] sources) {
Observer<T>[] as = observers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new CombinerObserver<T, R>(this, i);
}
lazySet(0); // release array contents
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (done || cancelled) {
Expand All @@ -136,11 +135,6 @@ public boolean isDisposed() {
return cancelled;
}

void cancel(SpscLinkedArrayQueue<?> q) {
clear(q);
cancelSources();
}

void cancelSources() {
for (CombinerObserver<T, R> s : observers) {
s.dispose();
Expand All @@ -149,96 +143,65 @@ void cancelSources() {

void clear(SpscLinkedArrayQueue<?> q) {
synchronized (this) {
Arrays.fill(latest, null);
latest = null;
}
q.clear();
}

void combine(T value, int index) {
CombinerObserver<T, R> cs = observers[index];

int a;
int c;
int len;
boolean empty;
boolean f;
synchronized (this) {
if (cancelled) {
return;
}
len = latest.length;
T o = latest[index];
a = active;
if (o == null) {
active = ++a;
}
c = complete;
if (value == null) {
complete = ++c;
} else {
latest[index] = value;
}
f = a == len;
// see if either all sources completed
empty = c == len
|| (value == null && o == null); // or this source completed without any value
if (!empty) {
if (value != null && f) {
queue.offer(cs, latest.clone());
} else
if (value == null && errors.get() != null) {
done = true; // if this source completed without a value
}
} else {
done = true;
}
}
if (!f && value != null) {
return;
}
drain();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}

final SpscLinkedArrayQueue<Object> q = queue;
final SpscLinkedArrayQueue<Object[]> q = queue;
final Observer<? super R> a = actual;
final boolean delayError = this.delayError;

int missed = 1;
for (;;) {

if (checkTerminated(done, q.isEmpty(), a, q, delayError)) {
return;
}

for (;;) {
if (cancelled) {
clear(q);
return;
}

if (!delayError && errors.get() != null) {
cancelSources();
clear(q);
a.onError(errors.terminate());
return;
}

boolean d = done;
@SuppressWarnings("unchecked")
CombinerObserver<T, R> cs = (CombinerObserver<T, R>)q.poll();
boolean empty = cs == null;
Object[] s = q.poll();
boolean empty = s == null;

if (checkTerminated(d, empty, a, q, delayError)) {
if (d && empty) {
clear(q);
Throwable ex = errors.terminate();
if (ex == null) {
a.onComplete();
} else {
a.onError(ex);
}
return;
}

if (empty) {
break;
}

@SuppressWarnings("unchecked")
T[] array = (T[])q.poll();

R v;

try {
v = ObjectHelper.requireNonNull(combiner.apply(array), "The combiner returned a null");
v = ObjectHelper.requireNonNull(combiner.apply(s), "The combiner returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
cancel(q);
errors.addThrowable(ex);
cancelSources();
clear(q);
ex = errors.terminate();
a.onError(ex);
return;
}
Expand All @@ -253,53 +216,81 @@ void drain() {
}
}


boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArrayQueue<?> q, boolean delayError) {
if (cancelled) {
cancel(q);
return true;
void innerNext(int index, T item) {
boolean shouldDrain = false;
synchronized (this) {
Object[] latest = this.latest;
if (latest == null) {
return;
}
Object o = latest[index];
int a = active;
if (o == null) {
active = ++a;
}
latest[index] = item;
if (a == latest.length) {
queue.offer(latest.clone());
shouldDrain = true;
}
}
if (shouldDrain) {
drain();
}
if (d) {
}

void innerError(int index, Throwable ex) {
if (errors.addThrowable(ex)) {
boolean cancelOthers = true;
if (delayError) {
if (empty) {
cancel(q);
Throwable e = errors.terminate();
if (e != null) {
a.onError(e);
} else {
a.onComplete();
synchronized (this) {
Object[] latest = this.latest;
if (latest == null) {
return;
}

cancelOthers = latest[index] == null;
if (cancelOthers || ++complete == latest.length) {
done = true;
}
return true;
}
} else {
Throwable e = errors.get();
if (e != null) {
cancel(q);
a.onError(errors.terminate());
return true;
} else
if (empty) {
clear(queue);
a.onComplete();
return true;
}
}
if (cancelOthers) {
cancelSources();
}
drain();
} else {
RxJavaPlugins.onError(ex);
}
return false;
}

void onError(Throwable e) {
if (!errors.addThrowable(e)) {
RxJavaPlugins.onError(e);
void innerComplete(int index) {
boolean cancelOthers = false;
synchronized (this) {
Object[] latest = this.latest;
if (latest == null) {
return;
}

cancelOthers = latest[index] == null;
if (cancelOthers || ++complete == latest.length) {
done = true;
}
}
if (cancelOthers) {
cancelSources();
}
drain();
}

}

static final class CombinerObserver<T, R> implements Observer<T> {
static final class CombinerObserver<T, R> extends AtomicReference<Disposable> implements Observer<T> {
private static final long serialVersionUID = -4823716997131257941L;

final LatestCoordinator<T, R> parent;
final int index;

final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
final int index;

CombinerObserver(LatestCoordinator<T, R> parent, int index) {
this.parent = parent;
Expand All @@ -308,27 +299,26 @@ static final class CombinerObserver<T, R> implements Observer<T> {

@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
DisposableHelper.setOnce(this, s);
}

@Override
public void onNext(T t) {
parent.combine(t, index);
parent.innerNext(index, t);
}

@Override
public void onError(Throwable t) {
parent.onError(t);
parent.combine(null, index);
parent.innerError(index, t);
}

@Override
public void onComplete() {
parent.combine(null, index);
parent.innerComplete(index);
}

public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1569,4 +1569,23 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
}

@Test
@SuppressWarnings("unchecked")
public void syncFirstErrorsAfterItemDelayError() {
Flowable.combineLatestDelayError(Arrays.asList(
Flowable.just(21).concatWith(Flowable.<Integer>error(new TestException())),
Flowable.just(21).delay(100, TimeUnit.MILLISECONDS)
),
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return (Integer)a[0] + (Integer)a[1];
}
}
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 42);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1198,4 +1198,23 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
ps2.onNext(2);
ts.assertResult(3);
}

@Test
@SuppressWarnings("unchecked")
public void syncFirstErrorsAfterItemDelayError() {
Observable.combineLatestDelayError(Arrays.asList(
Observable.just(21).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(21).delay(100, TimeUnit.MILLISECONDS)
),
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return (Integer)a[0] + (Integer)a[1];
}
}
)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 42);
}
}

0 comments on commit ea7ca2c

Please sign in to comment.