Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Exception unhandled after dispose() #4991

Closed
loongee opened this issue Jan 13, 2017 · 15 comments
Closed

2.x Exception unhandled after dispose() #4991

loongee opened this issue Jan 13, 2017 · 15 comments

Comments

@loongee
Copy link

loongee commented Jan 13, 2017

Exception unhandled if dispose() called.
Did I used it in a wrong way ?

error stack:

java.lang.IllegalStateException: example
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxCachedThreadScheduler-1" java.lang.IllegalStateException: illegal
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

sample code:


public class MyClass {

    static Disposable disposable = null;

    public static void main(String[] args) {

        Observable.just(1)
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        long endTime = System.currentTimeMillis() + 500;
                        while (System.currentTimeMillis() < endTime) {
                        }
                        return Observable.error(new IllegalStateException("example"));
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onError(Throwable e) {}

                    @Override
                    public void onComplete() {}

                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {}
                });


        try {
            Thread.sleep(50);
            disposable.dispose();
            Thread.sleep(500000);
        } catch (InterruptedException e) {
        }
    }
}
@akarnokd
Copy link
Member

flatMap is not expecting the Function to block or sleep thus doesn't check immediately if the user has disposed the sequence. When the error() emits, it finds a terminated sequence and relays the exception into the global error handler. Do you really have to wait/block inside the flatMap's function?

@akarnokd
Copy link
Member

This is a grey area and can be considered as bug. Fix in #4992.

@loongee
Copy link
Author

loongee commented Jan 15, 2017

In my situation, I used a socket.read() instead of the while (System.currentTimeMillis() < endTime) {}, if the socket returns the unexpecting data, Observable.error(...) will return to indicate an server error. Under my circumstance, the task will be cancelled when it's no longer needed. That's how this happened.

@akarnokd
Copy link
Member

Closing via #4992

@xudshen
Copy link

xudshen commented Mar 8, 2017

I also encountered this problem when using the Observable.fromCallable and the 2.0.7version does not seem to solve the problem.
@akarnokd @loongee would you pls help me solve it?

public class ExceptionAfterDispose {
    public static void test() {
        final Disposable disposable = Observable.fromCallable(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                long endTime = System.currentTimeMillis() + 2000;
                while (System.currentTimeMillis() < endTime) {
                }
                //network spends 2000ms, then throw an exception
                throw new IllegalStateException("error on network");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<Object>() {
                    @Override
                    public void onNext(Object o) {
                    }

                    @Override
                    public void onComplete() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }
                });
        //dispose it after 500ms
        Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                disposable.dispose();
            }
        });
    }
}

and the exception is

java.lang.IllegalStateException: error on network
   at info.xudshen.android.playground.recyclerview.fragment.ExceptionAfterDispose$2.call(ExceptionAfterDispose.java:27)
   at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:42)
   at io.reactivex.Observable.subscribe(Observable.java:10700)
   at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
   at io.reactivex.Scheduler$1.run(Scheduler.java:138)
   at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
   at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
   at java.util.concurrent.FutureTask.run(FutureTask.java:237)
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
   at java.lang.Thread.run(Thread.java:761)

@akarnokd
Copy link
Member

akarnokd commented Mar 8, 2017

@xudshen Indeed the dispose() call should have prevented the error to reach Observer.onError, but since fromCallable can't determine if the thrown exception was due to the dispose() call, it has to route the error to RxJavaPlugins.onError which by default crashes Android apps.

@xudshen
Copy link

xudshen commented Mar 8, 2017

@akarnokd I test the same logic with Flowable and it does not crash anymore. lol

@httpdispatch
Copy link

httpdispatch commented Jul 4, 2018

This still crashes at RxJava 2.1.16

@Test public void testDispose() throws InterruptedException {
        Disposable disposable = Flowable.fromCallable(() -> {
            System.out.println("Operation started");
            Thread.sleep(1000);
            System.out.println("Operation stopped");
            return true;
        })
                .subscribeOn(Schedulers.computation())
                .doOnNext(__ -> System.out.println("doOnNext"))
                .subscribe(value -> System.out.println("Value received " + value));
        Thread.sleep(30);
        System.out.println("Disposing");
        disposable.dispose();
        System.out.println("Disposed");
        Thread.sleep(1000);
    }

With custom break operator it also behaves weird, keeps deliver value to the doOnNext method

@Test public void testDispose() throws InterruptedException {
        Disposable disposable = Flowable.fromCallable(() -> {
            System.out.println("Operation started");
            Thread.sleep(1000);
            System.out.println("Operation stopped");
            return true;
        })
                .subscribeOn(Schedulers.computation())
                .lift(new BreakIfUnsubscribedFlowable<>())
                .doOnNext(__ -> System.out.println("doOnNext"))
                .subscribe(value -> System.out.println("Value received " + value));
        Thread.sleep(30);
        System.out.println("Disposing");
        disposable.dispose();
        System.out.println("Disposed");
        Thread.sleep(1000);
    }

 public class BreakIfUnsubscribedFlowable<T> implements FlowableOperator<T, T> {

        @Override public Subscriber<? super T> apply(Subscriber<? super T> observer) throws Exception {
            return new DisposableSubscriber<T>() {

                @Override public void onNext(T o) {
                    if (!isDisposed()) {
                        observer.onNext(o);
                    }
                }

                @Override public void onError(Throwable e) {
                    if (!isDisposed()) {
                        observer.onError(e);
                    }
                }

                @Override public void onComplete() {
                    if (!isDisposed()) {
                        observer.onComplete();
                    }
                }
            };
        }
    }

Output is

Operation started
Disposing
Disposed
Operation stopped
doOnNext

@akarnokd
Copy link
Member

akarnokd commented Jul 4, 2018

This is by design, your Callable crashes and the downstream has disposed so the operator routes the error to the global error handler as it can't know if the exception is relevant or not. Don't crash into RxJava and consider using create() with emitter.tryOnError() instead.

You implemented BreakIfUnsubscribedFlowable incorrectly as it has no connection to the downstream's dispose management. Btw you can achieve a similar disconnect effect via onTerminateDetach.

@httpdispatch
Copy link

@akarnokd with onTerminateDetach the error still slips and i have UndeliverableException. Can you please guide how to disable error routing in case when the flowable was created from callable? Or how to write "BreakIfUnsubscribedFlowable" operator properly?

@akarnokd
Copy link
Member

akarnokd commented Jul 4, 2018

@httpdispatch
Copy link

@akarnokd Thanks. Looks like such operator work as expected (even error is ignored). Do you see any issues in such implementation? May i rely on cancel() call?

public static class BreakIfUnsubscribedFlowable<T> implements FlowableOperator<T, T> {

        @Override public Subscriber<? super T> apply(Subscriber<? super T> observer) throws Exception {
            return new Op<>(observer);
        }

        static final class Op<T> implements FlowableSubscriber<T>, Subscription {
            final Subscriber<? super T> child;

            Subscription s;
            boolean cancelled;

            public Op(Subscriber<? super T> child) {
                this.child = child;
            }

            @Override public void onSubscribe(Subscription s) {
                this.s = s;
                child.onSubscribe(this);
            }

            @Override
            public void onNext(T v) {
                if (!cancelled) {
                    child.onNext(v);
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!cancelled) {
                    child.onError(e);
                }
            }

            @Override
            public void onComplete() {
                if (!cancelled) {
                    child.onComplete();
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
                s.cancel();
            }

            @Override
            public void request(long n) {
                s.request(n);
            }
        }
    }

@akarnokd
Copy link
Member

akarnokd commented Jul 4, 2018

Have cancelled volatile, otherwise it is fine for your own use.

@httpdispatch
Copy link

@akarnokd thank you very much for your help.

@ysyyork
Copy link

ysyyork commented Mar 9, 2022

I built a simple wrapper to resolve this issue

class SafeRxCallWrapper<T>(private val callback: () -> T) {
    fun runSingle(): Single<T> {
        return Single.create<T> { emitter ->
            try {
                val res = callback()
                emitter.onSuccess(res)
            } catch (th: Throwable) {
                emitter.tryOnError(th)
            }
        }
    }

    fun runObservable(): Observable<T> {
        return Observable.create<T> { emitter ->
            try {
                val res = callback()
                emitter.onNext(res)
            } catch (th: Throwable) {
                emitter.tryOnError(th)
            }
        }
    }

    fun runMaybe(): Maybe<T> {
        return Maybe.create<T> { emitter ->
            try {
                val res = callback()
                emitter.onSuccess(res)
            } catch (th: Throwable) {
                emitter.tryOnError(th)
            }
        }
    }

    fun runCompletable(): Completable {
        return Completable.create { emitter ->
            try {
                callback()
                emitter.onComplete()
            } catch (th: Throwable) {
                emitter.tryOnError(th)
            }
        }
    }
}

Then you can just do

val singleReturn = SafeRxCallWrapper<ClassA> {
    ClassA()
}.runSingle().subscribe({}, {})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants