Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Make CompositeExcpetion thread-safe like 1.x and also fix some issues #4619

Merged
merged 1 commit into from
Sep 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 18 additions & 49 deletions src/main/java/io/reactivex/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,24 @@ public final class CompositeException extends RuntimeException {
private final String message;
private Throwable cause;

/**
* Constructs an empty CompositeException.
*/
public CompositeException() {
this.exceptions = new ArrayList<Throwable>();
this.message = null;
}

/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
* @param exceptions the Throwables to have as initially suppressed exceptions
*
* @throws IllegalArgumentException if <code>exceptions</code> is empty.
*/
public CompositeException(Throwable... exceptions) {
this.exceptions = new ArrayList<Throwable>();
if (exceptions == null) {
this.message = "1 exception occurred. ";
this.exceptions.add(new NullPointerException("exceptions is null"));
} else {
this.message = exceptions.length + " exceptions occurred. ";
for (Throwable t : exceptions) {
this.exceptions.add(t != null ? t : new NullPointerException("One of the exceptions is null"));
}
}
this(exceptions == null ?
Arrays.asList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
}


/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
* @param errors the Throwables to have as initially suppressed exceptions
*
* @throws IllegalArgumentException if <code>errors</code> is empty.
*/
public CompositeException(Iterable<? extends Throwable> errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
Expand All @@ -83,13 +70,15 @@ public CompositeException(Iterable<? extends Throwable> errors) {
if (ex != null) {
deDupedExceptions.add(ex);
} else {
deDupedExceptions.add(new NullPointerException());
deDupedExceptions.add(new NullPointerException("Throwable was null!"));
}
}
} else {
deDupedExceptions.add(new NullPointerException());
deDupedExceptions.add(new NullPointerException("errors was null"));
}
if (deDupedExceptions.isEmpty()) {
throw new IllegalArgumentException("errors is empty");
}

localExceptions.addAll(deDupedExceptions);
this.exceptions = Collections.unmodifiableList(localExceptions);
this.message = exceptions.size() + " exceptions occurred. ";
Expand All @@ -109,17 +98,6 @@ public String getMessage() {
return message;
}

/**
* Adds a suppressed exception to this composite.
* <p>The method is named this way to avoid conflicts with Java 7 environments
* and its addSuppressed() method.
* @param e the exception to suppress, nulls are converted to NullPointerExceptions
*/
public void suppress(Throwable e) {
exceptions.add(e != null ? e : new NullPointerException("null exception"));
}


@Override
public synchronized Throwable getCause() { // NOPMD
if (cause == null) {
Expand Down Expand Up @@ -266,15 +244,16 @@ public String getMessage() {
private List<Throwable> getListOfCauses(Throwable ex) {
List<Throwable> list = new ArrayList<Throwable>();
Throwable root = ex.getCause();
if (root == null) {
if (root == null || root == ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test for this? I removed it because code coverage showed it as never taken.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return list;
} else {
while (true) {
list.add(root);
if (root.getCause() == null) {
Throwable cause = root.getCause();
if (cause == null || cause == root) {
return list;
} else {
root = root.getCause();
root = cause;
}
}
}
Expand All @@ -288,16 +267,6 @@ public int size() {
return exceptions.size();
}

/**
* Returns true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
* @return true if this CompositeException doesn't have a cause or
* any suppressed exceptions.
*/
public boolean isEmpty() {
return exceptions.isEmpty();
}

/**
* Returns the root cause of {@code e}. If {@code e.getCause()} returns {@code null} or {@code e}, just return {@code e} itself.
*
Expand All @@ -306,15 +275,15 @@ public boolean isEmpty() {
*/
private Throwable getRootCause(Throwable e) {
Throwable root = e.getCause();
if (root == null /* || cause == root */) { // case might not be possible
if (root == null || cause == root) {
return e;
}
while (true) {
Throwable cause = root.getCause();
if (cause == null /* || cause == root */) { // case might not be possible
if (cause == null || cause == root) {
return root;
}
root = root.getCause();
root = cause;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.reactivex.internal.operators.flowable;

import io.reactivex.plugins.RxJavaPlugins;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

Expand Down Expand Up @@ -550,39 +553,42 @@ boolean checkTerminate() {
}

void reportError(SimpleQueue<Throwable> q) {
CompositeException composite = null;
List<Throwable> composite = null;
Throwable ex = null;

Throwable t;
int count = 0;
for (;;) {
Throwable t;
try {
t = q.poll();
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
if (composite == null) {
composite = new CompositeException(ex);
if (ex == null) {
ex = exc;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(exc);
}
composite.suppress(exc);
break;
}

if (t == null) {
break;
}
if (count == 0) {
if (ex == null) {
ex = t;
} else {
if (composite == null) {
composite = new CompositeException(ex);
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.suppress(t);
composite.add(t);
}

count++;
}
if (composite != null) {
actual.onError(composite);
actual.onError(new CompositeException(composite));
} else {
actual.onError(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,8 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
void onError(Throwable e) {
for (;;) {
Throwable curr = error.get();
if (curr instanceof CompositeException) {
CompositeException ce = new CompositeException((CompositeException)curr);
ce.suppress(e);
if (curr != null) {
CompositeException ce = new CompositeException(curr, e);
e = ce;
}
Throwable next = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,39 +486,42 @@ boolean checkTerminate() {
}

void reportError(SimpleQueue<Throwable> q) {
CompositeException composite = null;
List<Throwable> composite = null;
Throwable ex = null;

Throwable t;
int count = 0;
for (;;) {
Throwable t;
try {
t = q.poll();
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
if (composite == null) {
composite = new CompositeException(exc);
if (ex == null) {
ex = exc;
} else {
if (composite == null) {
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.add(exc);
}
composite.suppress(exc);
break;
}

if (t == null) {
break;
}
if (count == 0) {
if (ex == null) {
ex = t;
} else {
if (composite == null) {
composite = new CompositeException(ex);
composite = new ArrayList<Throwable>();
composite.add(ex);
}
composite.suppress(t);
composite.add(t);
}

count++;
}
if (composite != null) {
actual.onError(composite);
actual.onError(new CompositeException(composite));
} else {
actual.onError(ex);
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/reactivex/observers/BaseTestConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,8 @@ protected final AssertionError fail(String message) {
;

AssertionError ae = new AssertionError(b.toString());
CompositeException ce = new CompositeException();
for (Throwable e : errors) {
ce.suppress(e);
}
if (!ce.isEmpty()) {
if (!errors.isEmpty()) {
CompositeException ce = new CompositeException(errors);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth changing this so when there is only one exception, it doesn't wrap and lenghten the stacktrace unnecessarily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #4631

ae.initCause(ce);
}
return ae;
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/observers/SafeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,22 @@ public void onError(Throwable t) {
done = true;

if (s == null) {
CompositeException t2 = new CompositeException(t, new NullPointerException("Subscription not set!"));
Throwable npe = new NullPointerException("Subscription not set!");

try {
actual.onSubscribe(EmptyDisposable.INSTANCE);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because the actual's state may be corrupt at this point
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
return;
}
try {
actual.onError(t2);
actual.onError(new CompositeException(t, npe));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if onError failed, all that's left is to report the error to plugins
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
}
return;
}
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/subscribers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,22 @@ public void onError(Throwable t) {
done = true;

if (s == null) {
CompositeException t2 = new CompositeException(t, new NullPointerException("Subscription not set!"));
Throwable npe = new NullPointerException("Subscription not set!");

try {
actual.onSubscribe(EmptySubscription.INSTANCE);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because the actual's state may be corrupt at this point
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
return;
}
try {
actual.onError(t2);
actual.onError(new CompositeException(t, npe));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if onError failed, all that's left is to report the error to plugins
t2.suppress(e);

RxJavaPlugins.onError(t2);
RxJavaPlugins.onError(new CompositeException(t, npe, e));
}
return;
}
Expand Down
Loading