You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Expected behavior:
The request would be canceled silently
Actual behavior:
Huge pile of stack traces:
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.io.InterruptedIOException: interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
at com.influxdb.internal.AbstractRestClient.catchOrPropagateException(AbstractRestClient.java:158)
at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:140)
at com.influxdb.internal.AbstractQueryApi.lambda$query$5(AbstractQueryApi.java:174)
at com.influxdb.internal.AbstractQueryApi$1.onResponse(AbstractQueryApi.java:220)
at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:238)
at com.influxdb.internal.AbstractQueryApi.query(AbstractQueryApi.java:190)
at com.influxdb.internal.AbstractQueryApi.queryRaw(AbstractQueryApi.java:144)
at com.influxdb.client.reactive.internal.QueryReactiveApiImpl.lambda$null$8(QueryReactiveApiImpl.java:334)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.Flowable.subscribe(Flowable.java:14882)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:55)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:153)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:67)
at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.Flowable.subscribe(Flowable.java:14872)
at io.reactivex.Flowable.subscribe(Flowable.java:14730)
at main.Main.main(Main.java:33)
Caused by: java.io.InterruptedIOException: interrupted
at okio.Timeout.throwIfReached(Timeout.kt:98)
at okio.InputStreamSource.read(JvmOkio.kt:87)
at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:129)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okhttp3.internal.http1.Http1ExchangeCodec$AbstractSource.read(Http1ExchangeCodec.java:389)
at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSource.read(Http1ExchangeCodec.java:475)
at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.java:286)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okio.RealBufferedSource.exhausted(RealBufferedSource.kt:198)
at okio.InflaterSource.refill(InflaterSource.kt:112)
at okio.InflaterSource.readOrInflate(InflaterSource.kt:76)
at okio.InflaterSource.read(InflaterSource.kt:49)
at okio.GzipSource.read(GzipSource.kt:69)
at okio.RealBufferedSource.read(RealBufferedSource.kt:188)
at okio.ForwardingSource.read(ForwardingSource.kt:29)
at retrofit2.OkHttpCall$ExceptionCatchingResponseBody$1.read(OkHttpCall.java:314)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:449)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:117)
at okio.RealBufferedSource.readUtf8Line(RealBufferedSource.kt:319)
at com.influxdb.internal.AbstractQueryApi.parseFluxResponseToLines(AbstractQueryApi.java:249)
at com.influxdb.internal.AbstractQueryApi.lambda$queryRaw$4(AbstractQueryApi.java:138)
... 47 more
Shouldn't AbstractRestClient check if the onError-handler is disposed before calling accept(exception):
voidcatchOrPropagateException(@NonnullfinalExceptionexception,
@NonnullfinalConsumer<? superThrowable> onError) {
Arguments.checkNotNull(exception, "exception");
Arguments.checkNotNull(onError, "onError");
//// Socket closed by remote server or end of data//if (isCloseException(exception)) {
LOG.log(Level.FINEST, "Socket closed by remote server or end of data", exception);
} else {
// here check if onError is disposed already ?onError.accept(exception);
}
}
Or alternatively add a case to the if-clause and check for interruptions?
Specifications:
Client Version: 4.3.0
InfluxDB Version: 2.1.1
JDK Version: 11
Platform: Linux
The text was updated successfully, but these errors were encountered:
Consumer<Throwable> onError = throwable -> {
if (!subscriber.isDisposed()) {
subscriber.onError(throwable);
} else {
LOG.log(Level.FINEST, "The exception could not be delivered to the consumer "
+ "because it has already canceled/disposed.", throwable);
}
};
queryRaw(queryCall, consumer, onError, subscriber::onComplete, false);
Steps to reproduce:
List the minimal actions needed to reproduce the behavior.
QueryReactiveApi
and schedule it as aFuture
future.cancel(true);
)Expected behavior:
The request would be canceled silently
Actual behavior:
Huge pile of stack traces:
Shouldn't
AbstractRestClient
check if the onError-handler is disposed before callingaccept(exception)
:Or alternatively add a case to the if-clause and check for interruptions?
Specifications:
The text was updated successfully, but these errors were encountered: