From c74f9550b6cf668ec36c8bd5ac3a00eec89e1349 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 2 Jun 2014 14:24:50 -0700 Subject: [PATCH 1/2] Hooked RxJavaPlugins errorHandler up within all operators that swallow onErrors. * Otherwise, the only onErrors visible to plugin are the ones which propagate all the way to SafeSubscriber. --- .../main/java/rx/internal/operators/OperatorMaterialize.java | 2 ++ .../java/rx/internal/operators/OperatorMergeDelayError.java | 4 ++++ .../java/rx/internal/operators/OperatorOnErrorFlatMap.java | 2 ++ .../operators/OperatorOnErrorResumeNextViaFunction.java | 2 ++ .../operators/OperatorOnErrorResumeNextViaObservable.java | 2 ++ .../java/rx/internal/operators/OperatorOnErrorReturn.java | 2 ++ .../operators/OperatorOnExceptionResumeNextViaObservable.java | 2 ++ 7 files changed, 16 insertions(+) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java index 98f5c4ae6a..9ee684a589 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -18,6 +18,7 @@ import rx.Notification; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Turns all of the notifications from an Observable into onNext emissions, and marks @@ -42,6 +43,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); child.onNext(Notification. createOnError(e)); child.onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java index 3985e4b083..bfd47a1cd6 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java @@ -22,6 +22,7 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.observers.SerializedSubscriber; +import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; /** @@ -125,10 +126,13 @@ public void onError(Throwable e) { public void onCompleted() { complete(); } + void error(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); exceptions.add(e); complete(); } + void complete() { if (WIP_UPDATER.decrementAndGet(this) == 0) { if (exceptions.isEmpty()) { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java index e7750785c3..f8e56971f8 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorFlatMap.java @@ -20,6 +20,7 @@ import rx.Subscriber; import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Allows inserting onNext events into a stream when onError events are received @@ -46,6 +47,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); Observable resume = resumeFunction.call(OnErrorThrowable.from(e)); resume.unsafeSubscribe(new Subscriber() { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index c1a5c8bb73..a2b71258f0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -19,6 +19,7 @@ import rx.Observable.Operator; import rx.Subscriber; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable (the return value of a function) @@ -59,6 +60,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); Observable resume = resumeFunction.call(e); resume.unsafeSubscribe(child); } catch (Throwable e2) { diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java index efee5c8c1b..56091cb005 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -58,6 +59,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); resumeSequence.unsafeSubscribe(child); } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java index f96441a76f..bd1536ff75 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java @@ -20,6 +20,7 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to emit a particular item to its Observer's onNext method @@ -59,6 +60,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); T result = resultFunction.call(e); child.onNext(result); diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java index db3a344cce..78008e56d0 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.plugins.RxJavaPlugins; /** * Instruct an Observable to pass control to another Observable rather than invoking @@ -63,6 +64,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { if (e instanceof Exception) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); resumeSequence.unsafeSubscribe(child); } else { From 3cf18a1a05a06d8f154439f176202402171aeb57 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 2 Jun 2014 16:16:28 -0700 Subject: [PATCH 2/2] Remove call to RxJavaPlugins ErrorHander in mergeDelayError operator - CompositeException is already visible --- .../java/rx/internal/operators/OperatorMergeDelayError.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java index bfd47a1cd6..fe57ac8354 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMergeDelayError.java @@ -22,7 +22,6 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.observers.SerializedSubscriber; -import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; /** @@ -128,7 +127,6 @@ public void onCompleted() { } void error(Throwable e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); exceptions.add(e); complete(); }