Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryReactiveApi The exception could not be delivered to the consumer because it has already canceled/disposed #311

Closed
jsimomaa opened this issue Mar 10, 2022 · 1 comment · Fixed by #313
Labels
bug Something isn't working
Milestone

Comments

@jsimomaa
Copy link
Contributor

Steps to reproduce:
List the minimal actions needed to reproduce the behavior.

  1. Create a long running request with QueryReactiveApi and schedule it as a Future
  2. Cancel the future forcibly (future.cancel(true);)
QueryReactiveApi queryApi = client.getQueryReactiveApi();

Disposable disposable;
Future<?> future = executor.scheduleWithFixedDelay(() -> {
    disposable = queryApi.queryRaw(query)
        .doOnNext(onNext -> {
            System.out.println("onNext");
        }).subscribe();
}, 0, 60, TimeUnit.SECONDS);

future.cancel(true);

Expected behavior:
The request would be canceled silently

Actual behavior:
Huge pile of stack traces:

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.io.InterruptedIOException: interrupted
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at com.influxdb.internal.AbstractRestClient.catchOrPropagateException(AbstractRestClient.java:158)
        at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:140)
        at com.influxdb.internal.AbstractQueryApi.lambda$query$5(AbstractQueryApi.java:174)
        at com.influxdb.internal.AbstractQueryApi$1.onResponse(AbstractQueryApi.java:220)
        at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:238)
        at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:190)
        at com.influxdb.internal.AbstractQueryApi.queryRaw(AbstractQueryApi.java:144)
        at com.influxdb.client.reactive.internal.QueryReactiveApiImpl.lambda$null$8(QueryReactiveApiImpl.java:334)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
        at io.reactivex.Observable.subscribe(Observable.java:12284)
        at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.Flowable.subscribe(Flowable.java:14882)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
        at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.Flowable.subscribe(Flowable.java:14872)
        at io.reactivex.Flowable.subscribe(Flowable.java:14730)
        at main.Main.main(Main.java:33)
Caused by: java.io.InterruptedIOException: interrupted
        at okio.Timeout.throwIfReached(Timeout.kt:98)
        at okio.InputStreamSource.read(JvmOkio.kt:87)
        at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:129)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okhttp3.internal.http1.Http1ExchangeCodec$AbstractSource.read(Http1ExchangeCodec.java:389)
        at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:475)
        at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.java:286)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okio.RealBufferedSource.exhausted(RealBufferedSource.kt:198)
        at okio.InflaterSource.refill(InflaterSource.kt:112)
        at okio.InflaterSource.readOrInflate(InflaterSource.kt:76)
        at okio.InflaterSource.read(InflaterSource.kt:49)
        at okio.GzipSource.read(GzipSource.kt:69)
        at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
        at okio.ForwardingSource.read(ForwardingSource.kt:29)
        at retrofit2.OkHttpCall$ExceptionCatchingResponseBody$1.read(OkHttpCall.java:314)
        at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:449)
        at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:117)
        at okio.RealBufferedSource.readUtf8Line(RealBufferedSource.kt:319)
        at com.influxdb.internal.AbstractQueryApi.parseFluxResponseToLines(AbstractQueryApi.java:249)
        at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:138)
        ... 47 more

Shouldn't AbstractRestClient check if the onError-handler is disposed before calling accept(exception):

    void catchOrPropagateException(@Nonnull final Exception exception,
                                   @Nonnull final Consumer<? super Throwable> onError) {

        Arguments.checkNotNull(exception, "exception");
        Arguments.checkNotNull(onError, "onError");

        //
        // Socket closed by remote server or end of data
        //
        if (isCloseException(exception)) {
            LOG.log(Level.FINEST, "Socket closed by remote server or end of data", exception);
        } else {
            // here check if onError is disposed already ?
            onError.accept(exception);
        }
    }

Or alternatively add a case to the if-clause and check for interruptions?

Specifications:

  • Client Version: 4.3.0
  • InfluxDB Version: 2.1.1
  • JDK Version: 11
  • Platform: Linux
@bednar
Copy link
Contributor

bednar commented Mar 11, 2022

Hi @jsimomaa,

thanks for using our client and your detail description.

I think the correct solution will be change how is subscriber::onError pass to AbstractRestClient.

should be change to something like:

Consumer<Throwable> onError = throwable -> {
    if (!subscriber.isDisposed()) {
        subscriber.onError(throwable);
    } else {
        LOG.log(Level.FINEST, "The exception could not be delivered to the consumer "
                + "because it has already canceled/disposed.", throwable);
    }
};

queryRaw(queryCall, consumer, onError, subscriber::onComplete, false);

Regards

@bednar bednar added the bug Something isn't working label Mar 11, 2022
@bednar bednar added this to the 5.1.0 milestone Mar 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants