Skip to content

Commit

Permalink
fix(reactive): do not deliver exception when the consumer is disposed (
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Mar 22, 2022
1 parent 8f2c513 commit 1d5d8ec
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 5.1.0 [unreleased]

### Bug Fixes
1. [#313](https://github.com/influxdata/influxdb-client-java/pull/313): Do not deliver `exception` when the consumer is already disposed [influxdb-client-reactive]

## 5.0.0 [2022-03-18]

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
*/
public abstract class AbstractQueryApi extends AbstractRestClient {

private static final Logger LOG = Logger.getLogger(AbstractQueryApi.class.getName());
protected static final Logger LOG = Logger.getLogger(AbstractQueryApi.class.getName());

protected final FluxCsvParser fluxCsvParser = new FluxCsvParser();
protected final FluxResultMapper resultMapper = new FluxResultMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
package com.influxdb.client.reactive.internal;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -41,6 +43,8 @@
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;

/**
Expand Down Expand Up @@ -187,7 +191,7 @@ public void accept(final int index,
}
};

query(queryCall, consumer, subscriber::onError, subscriber::onComplete, false);
query(queryCall, consumer, onError(subscriber), subscriber::onComplete, false);
});

return observable.toFlowable(BackpressureStrategy.BUFFER);
Expand Down Expand Up @@ -331,7 +335,7 @@ public Publisher<String> queryRawQuery(@Nonnull final Publisher<Query> queryStre
}
};

queryRaw(queryCall, consumer, subscriber::onError, subscriber::onComplete, false);
queryRaw(queryCall, consumer, onError(subscriber), subscriber::onComplete, false);
});

return observable.toFlowable(BackpressureStrategy.BUFFER);
Expand All @@ -350,4 +354,16 @@ public Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
return queryRawQuery(Flowable.fromPublisher(queryStream)
.map(q -> new Query().query(q).dialect(dialect)), dialect, org);
}

@NotNull
private Consumer<Throwable> onError(final ObservableEmitter<?> subscriber) {
return throwable -> {
if (!subscriber.isDisposed()) {
subscriber.onError(throwable);
} else {
LOG.log(Level.FINEST, "The exception could not be delivered to the consumer "
+ "because it has already canceled/disposed.", throwable);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.client.reactive;

import java.io.InterruptedIOException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.influxdb.test.AbstractMockServerTest;

import io.reactivex.Flowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

/**
* @author Jakub Bednar (03/16/2022 08:28)
*/
@RunWith(JUnitPlatform.class)
class QueryReactiveApiTest extends AbstractMockServerTest {

private InfluxDBClientReactive influxDBClient;

@BeforeEach
void setUp() {

influxDBClient = InfluxDBClientReactiveFactory.create(startMockServer());
}

@AfterEach
void tearDown() {

influxDBClient.close();
}

@Test
public void doNotPropagateErrorOnCanceledConsumer() throws InterruptedException {

mockServer.enqueue(createErrorResponse("Request Timeout", true, 408)
.setBodyDelay(3, TimeUnit.SECONDS));

QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();

final Throwable[] throwable = new Throwable[1];
Future<?> future = Executors
.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(() -> Flowable
.fromPublisher(queryApi.queryRaw("from()", "my-org"))
.subscribe(
s -> {

},
t -> throwable[0] = t),
0, 1, TimeUnit.SECONDS);

Thread.sleep(2_000);
future.cancel(true);
Thread.sleep(2_000);

Assertions.assertThat(throwable[0]).isNotNull();
Assertions.assertThat(throwable[0]).isInstanceOf(InterruptedIOException.class);
}
}

0 comments on commit 1d5d8ec

Please sign in to comment.