Skip to content

Commit

Permalink
2.x: coverage, fixes, cleanup 10/20-2 (#4738)
Browse files Browse the repository at this point in the history
* 2.x: coverage, fixes, cleanup 10/20-2

* Fix Generate not saving the state
  • Loading branch information
akarnokd authored Oct 20, 2016
1 parent 42ca8c4 commit d2c9c34
Show file tree
Hide file tree
Showing 22 changed files with 977 additions and 290 deletions.
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.fuseable;

/**
* Override of the SimpleQueue interface with no throws Exception on poll.
*
* @param <T> the value type to enqueue and dequeue, not null
*/
public interface SimplePlainQueue<T> extends SimpleQueue<T> {

@Override
T poll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.plugins.RxJavaPlugins;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

Expand All @@ -25,10 +21,12 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableFlatMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Expand Down Expand Up @@ -64,13 +62,11 @@ static final class MergeSubscriber<T, U> extends AtomicInteger implements Subscr
final int maxConcurrency;
final int bufferSize;

volatile SimpleQueue<U> queue;
volatile SimplePlainQueue<U> queue;

volatile boolean done;

final AtomicReference<SimpleQueue<Throwable>> errors = new AtomicReference<SimpleQueue<Throwable>>();

static final SimpleQueue<Throwable> ERRORS_CLOSED = new RejectingQueue<Throwable>();
final AtomicThrowable errs = new AtomicThrowable();

volatile boolean cancelled;

Expand Down Expand Up @@ -126,7 +122,7 @@ public void onNext(T t) {
}
Publisher<? extends U> p;
try {
p = mapper.apply(t);
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
Expand All @@ -140,7 +136,7 @@ public void onNext(T t) {
u = ((Callable<U>)p).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
getErrorQueue().offer(ex);
errs.addThrowable(ex);
drain();
return;
}
Expand Down Expand Up @@ -210,7 +206,7 @@ void removeInner(InnerSubscriber<T, U> inner) {
}

SimpleQueue<U> getMainQueue() {
SimpleQueue<U> q = queue;
SimplePlainQueue<U> q = queue;
if (q == null) {
if (maxConcurrency == Integer.MAX_VALUE) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
Expand Down Expand Up @@ -316,9 +312,12 @@ public void onError(Throwable t) {
RxJavaPlugins.onError(t);
return;
}
getErrorQueue().offer(t);
done = true;
drain();
if (errs.addThrowable(t)) {
done = true;
drain();
} else {
RxJavaPlugins.onError(t);
}
}

@Override
Expand All @@ -343,9 +342,13 @@ public void request(long n) {
public void cancel() {
if (!cancelled) {
cancelled = true;
s.cancel();
disposeAll();
if (getAndIncrement() == 0) {
s.cancel();
disposeAll();
SimpleQueue<U> q = queue;
if (q != null) {
q.clear();
}
}
}
}
Expand All @@ -363,7 +366,7 @@ void drainLoop() {
if (checkTerminate()) {
return;
}
SimpleQueue<U> svq = queue;
SimplePlainQueue<U> svq = queue;

long r = requested.get();
boolean unbounded = r == Long.MAX_VALUE;
Expand All @@ -375,12 +378,8 @@ void drainLoop() {
long scalarEmission = 0;
U o = null;
while (r != 0L) {
try {
o = svq.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
getErrorQueue().offer(ex);
}
o = svq.poll();

if (checkTerminate()) {
return;
}
Expand Down Expand Up @@ -413,11 +412,11 @@ void drainLoop() {
int n = inner.length;

if (d && (svq == null || svq.isEmpty()) && n == 0) {
SimpleQueue<Throwable> e = errors.get();
if (e == null || e.isEmpty()) {
Throwable ex = errs.terminate();
if (ex == null) {
child.onComplete();
} else {
reportError(e);
child.onError(ex);
}
return;
}
Expand Down Expand Up @@ -447,6 +446,7 @@ void drainLoop() {
}

int j = index;
sourceLoop:
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
Expand All @@ -456,33 +456,40 @@ void drainLoop() {

U o = null;
for (;;) {
if (checkTerminate()) {
return;
}
SimpleQueue<U> q = is.queue;
if (q == null) {
break;
}
long produced = 0;
while (r != 0L) {
if (checkTerminate()) {
return;
}
SimpleQueue<U> q = is.queue;
if (q == null) {
break;
}

try {
o = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

s.cancel();
disposeAll();

child.onError(ex);
return;
is.dispose();
errs.addThrowable(ex);
if (checkTerminate()) {
return;
}
removeInner(is);
innerCompleted = true;
i++;
continue sourceLoop;
}
if (o == null) {
break;
}

child.onNext(o);

if (checkTerminate()) {
return;
}

r--;
produced++;
}
Expand Down Expand Up @@ -536,86 +543,31 @@ void drainLoop() {

boolean checkTerminate() {
if (cancelled) {
s.cancel();
disposeAll();
SimpleQueue<U> q = queue;
if (q != null) {
q.clear();
}
return true;
}
SimpleQueue<Throwable> e = errors.get();
if (!delayErrors && (e != null && !e.isEmpty())) {
try {
reportError(e);
} finally {
disposeAll();
}
if (!delayErrors && errs.get() != null) {
actual.onError(errs.terminate());
return true;
}
return false;
}

void reportError(SimpleQueue<Throwable> q) {
List<Throwable> composite = null;
Throwable ex = null;

for (;;) {
Throwable t;
try {
t = q.poll();
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
if (ex == null) {
ex = exc;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(exc);
}
break;
}

if (t == null) {
break;
}
if (ex == null) {
ex = t;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(t);
}
}
if (composite != null) {
actual.onError(new CompositeException(composite));
} else {
actual.onError(ex);
}
}

void disposeAll() {
InnerSubscriber<?, ?>[] a = subscribers.get();
if (a != CANCELLED) {
a = subscribers.getAndSet(CANCELLED);
if (a != CANCELLED) {
errors.getAndSet(ERRORS_CLOSED);
for (InnerSubscriber<?, ?> inner : a) {
inner.dispose();
}
}
}
}

SimpleQueue<Throwable> getErrorQueue() {
for (;;) {
SimpleQueue<Throwable> q = errors.get();
if (q != null) {
return q;
}
q = new MpscLinkedQueue<Throwable>();
if (errors.compareAndSet(null, q)) {
return q;
Throwable ex = errs.terminate();
if (ex != null && ex != ExceptionHelper.TERMINATED) {
RxJavaPlugins.onError(ex);
}
}
}
}
Expand Down Expand Up @@ -676,9 +628,12 @@ public void onNext(U t) {
}
@Override
public void onError(Throwable t) {
parent.getErrorQueue().offer(t);
done = true;
parent.drain();
if (parent.errs.addThrowable(t)) {
done = true;
parent.drain();
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
Expand Down Expand Up @@ -708,31 +663,4 @@ public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}
}

static final class RejectingQueue<T> implements SimpleQueue<T> {
@Override
public boolean offer(T e) {
return false;
}

@Override
public boolean offer(T v1, T v2) {
return false;
}

@Override
public T poll() {
return null;
}

@Override
public void clear() {

}

@Override
public boolean isEmpty() {
return true;
}
}
}
Loading

0 comments on commit d2c9c34

Please sign in to comment.