Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): add possibility to specify WriteConsistency parameter #329

Merged
merged 16 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ This release also uses new version of InfluxDB OSS API definitions - [oss.yml](h
1. [#315](https://github.com/influxdata/influxdb-client-java/pull/315): Add support for timezones [FluxDSL]
1. [#317](https://github.com/influxdata/influxdb-client-java/pull/317): Gets HTTP headers from the unsuccessful HTTP request
1. [#334](https://github.com/influxdata/influxdb-client-java/pull/334): Supports not operator [FluxDSL]
1. [#329](https://github.com/influxdata/influxdb-client-java/pull/329): Add support for write `consistency` parameter [InfluxDB Enterprise]

Configure `consistency` via `Write API`:
```diff
- writeApi.writeRecord(WritePrecision.NS, "cpu_load_short,host=server02 value=0.67");
+ WriteParameters parameters = new WriteParameters(WritePrecision.NS, WriteConsistency.ALL);
+
+ writeApi.writeRecord("cpu_load_short,host=server02 value=0.67", parameters);
```

Configure `consistency` via client options:
```diff
- InfluxDBClient client = InfluxDBClientFactory.createV1("http://influxdb_enterpriser:8086",
- "my-username",
- "my-password".toCharArray(),
- "my-db",
- "autogen");
+ InfluxDBClient client = InfluxDBClientFactory.createV1("http://influxdb_enterpriser:8086",
+ "my-username",
+ "my-password".toCharArray(),
+ "my-db",
+ "autogen",
+ WriteConsistency.ALL);
```

### Bug Fixes
1. [#313](https://github.com/influxdata/influxdb-client-java/pull/313): Do not deliver `exception` when the consumer is already disposed [influxdb-client-reactive]
Expand Down
40 changes: 21 additions & 19 deletions client-kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,17 @@ A client can be configured via configuration file. The configuration file has to

The following options are supported:

| Property name | default | description |
| --------------------------|-----------|-------------|
| influx2.url | - | the url to connect to InfluxDB |
| influx2.org | - | default destination organization for writes and queries |
| influx2.bucket | - | default destination bucket for writes |
| influx2.token | - | the token to use for the authorization |
| influx2.logLevel | NONE | rest client verbosity level |
| influx2.readTimeout | 10000 ms | read timeout |
| influx2.writeTimeout | 10000 ms | write timeout |
| influx2.connectTimeout | 10000 ms | socket timeout |
| Property name | default | description |
|--------------------------|------------|------------------------------------------------------------|
| influx2.url | - | the url to connect to InfluxDB |
| influx2.org | - | default destination organization for writes and queries |
| influx2.bucket | - | default destination bucket for writes |
| influx2.token | - | the token to use for the authorization |
| influx2.logLevel | NONE | rest client verbosity level |
| influx2.readTimeout | 10000 ms | read timeout |
| influx2.writeTimeout | 10000 ms | write timeout |
| influx2.connectTimeout | 10000 ms | socket timeout |
| influx2.precision | NS | default precision for unix timestamps in the line protocol |

The `influx2.readTimeout`, `influx2.writeTimeout` and `influx2.connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.

Expand Down Expand Up @@ -222,15 +223,16 @@ val influxDBClient = InfluxDBClientKotlinFactory
```
The following options are supported:

| Property name | default | description |
| ------------------|-----------|-------------|
| org | - | default destination organization for writes and queries |
| bucket | - | default destination bucket for writes |
| token | - | the token to use for the authorization |
| logLevel | NONE | rest client verbosity level |
| readTimeout | 10000 ms | read timeout |
| writeTimeout | 10000 ms | write timeout |
| connectTimeout | 10000 ms | socket timeout |
| Property name | default | description |
|------------------|------------|------------------------------------------------------------|
| org | - | default destination organization for writes and queries |
| bucket | - | default destination bucket for writes |
| token | - | the token to use for the authorization |
| logLevel | NONE | rest client verbosity level |
| readTimeout | 10000 ms | read timeout |
| writeTimeout | 10000 ms | write timeout |
| connectTimeout | 10000 ms | socket timeout |
| precision | NS | default precision for unix timestamps in the line protocol |

The `readTimeout`, `writeTimeout` and `connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.influxdb.client.kotlin

import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.WriteParameters
import com.influxdb.client.write.Point
import kotlinx.coroutines.flow.Flow

Expand Down Expand Up @@ -82,6 +83,17 @@ interface WriteKotlinApi {
*/
suspend fun writeRecords(records: Flow<String>, precision: WritePrecision, bucket: String? = null, org: String? = null)

/**
* Write Line Protocol records into InfluxDB.
*
* If any exception occurs during write, this exception is rethrown from this method.
*
* @param records specified in [LineProtocol](http://bit.ly/line-protocol).
* The `records` are considered as one batch unit.
* @param parameters specify InfluxDB Write endpoint parameters
*/
suspend fun writeRecords(records: Flow<String>, parameters: WriteParameters)

/**
* Write Data Point into InfluxDB.
*
Expand Down Expand Up @@ -127,6 +139,16 @@ interface WriteKotlinApi {
*/
suspend fun writePoints(points: Flow<Point>, bucket: String? = null, org: String? = null)

/**
* Write Data Points into InfluxDB.
*
* If any exception occurs during write, this exception is rethrown from this method.
*
* @param points specified data points. The `points` are considered as one batch unit.
* @param parameters specify InfluxDB Write endpoint parameters
*/
suspend fun writePoints(points: Flow<Point>, parameters: WriteParameters)

/**
* Write Measurement into InfluxDB.
*
Expand Down Expand Up @@ -177,4 +199,14 @@ interface WriteKotlinApi {
* @param <M> measurement type
*/
suspend fun <M> writeMeasurements(measurements: Flow<M>, precision: WritePrecision, bucket: String? = null, org: String? = null)

/**
* Write Measurements into InfluxDB.
*
* If any exception occurs during write, this exception is rethrown from this method.
*
* @param measurements specified Measurements. The `measurements` are considered as one batch unit.
* @param parameters specify InfluxDB Write endpoint parameters
*/
suspend fun <M> writeMeasurements(measurements: Flow<M>, parameters: WriteParameters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import com.influxdb.client.internal.AbstractWriteClient
import com.influxdb.client.kotlin.WriteKotlinApi
import com.influxdb.client.service.WriteService
import com.influxdb.client.write.Point
import com.influxdb.client.write.WriteParameters
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import java.util.*

/**
* @author Jakub Bednar (20/04/2021 9:27)
Expand All @@ -58,6 +58,10 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
write(records.map { AbstractWriteClient.BatchWriteDataRecord(it) }, precision, bucket, org)
}

override suspend fun writeRecords(records: Flow<String>, parameters: WriteParameters) {
write(records.map { AbstractWriteClient.BatchWriteDataRecord(it) }, parameters)
}

override suspend fun writePoint(point: Point, bucket: String?, org: String?) {
writePoints(listOf(point), bucket, org)
}
Expand All @@ -67,15 +71,17 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
}

override suspend fun writePoints(points: Flow<Point>, bucket: String?, org: String?) {
writePoints(points, WriteParameters(bucket, org, options.precision, options.consistency))
}

override suspend fun writePoints(points: Flow<Point>, parameters: WriteParameters) {
points
.toList()
.groupByTo(LinkedHashMap(), { it.precision }, { it })
.forEach { group ->
write(
group.value.asFlow().map { AbstractWriteClient.BatchWriteDataPoint(it, options) },
group.key,
bucket,
org
parameters.copy(group.key, options)
)
}
}
Expand Down Expand Up @@ -107,6 +113,10 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
write(measurements.map { toMeasurementBatch(it, precision) }, precision, bucket, org)
}

override suspend fun <M> writeMeasurements(measurements: Flow<M>, parameters: WriteParameters) {
write(measurements.map { toMeasurementBatch(it, parameters.precisionSafe(options)) }, parameters)
}

private suspend fun write(
records: Flow<AbstractWriteClient.BatchWriteData>,
precision: WritePrecision,
Expand All @@ -117,7 +127,15 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
val bucketOrOption = bucket ?: options.bucket.orEmpty()
val orgOrOption = org ?: options.org.orEmpty()

write(bucketOrOption, orgOrOption, precision, records.toList().stream())
write(records, WriteParameters(bucketOrOption, orgOrOption, precision))
}

private suspend fun write(
records: Flow<AbstractWriteClient.BatchWriteData>,
parameters: WriteParameters
) {

write(parameters, records.toList().stream())
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
*/
package com.influxdb.client.kotlin

import com.influxdb.client.domain.WriteConsistency
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.Point
import com.influxdb.client.write.WriteParameters
import com.influxdb.exceptions.UnauthorizedException
import com.influxdb.test.AbstractMockServerTest
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import okhttp3.mockwebserver.RecordedRequest
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -230,6 +234,57 @@ class WriteKotlinApiTest : AbstractMockServerTest() {
Assertions.assertThat(mockServer.requestCount).isEqualTo(5)
}

@Test
fun writeParameters(): Unit = runBlocking {
val assertParameters = Runnable {
val request: RecordedRequest = try {
takeRequest()
} catch (e: InterruptedException) {
throw RuntimeException(e)
}
Assertions.assertThat(request.requestUrl).isNotNull
Assertions.assertThat("s")
.isEqualTo(request.requestUrl!!.queryParameter("precision"))
Assertions.assertThat("c").isEqualTo(request.requestUrl!!.queryParameter("bucket"))
Assertions.assertThat("d").isEqualTo(request.requestUrl!!.queryParameter("org"))
Assertions.assertThat("quorum")
.isEqualTo(request.requestUrl!!.queryParameter("consistency"))
}

val parameters = WriteParameters("c", "d", WritePrecision.S, WriteConsistency.QUORUM)

// records
val lineProtocols = flow {
for (i in 1..49) {
emit("h2o,location=coyote_creek level=${i}.0 $i")
}
}
enqueuedResponse()
writeApi.writeRecords(lineProtocols, parameters)
assertParameters.run()

// points
val point = Point
.measurement("h2o")
.addField("level", 1)
.time(1, WritePrecision.S)

enqueuedResponse()
writeApi.writePoints(listOf(point, point).asFlow(), parameters)
assertParameters.run()

// measurements
val mem = ITQueryKotlinApi.Mem()
mem.host = "192.168.1.100"
mem.region = "europe"
mem.free = 40
mem.time = Instant.ofEpochSecond(10)

enqueuedResponse()
writeApi.writeMeasurements(listOf(mem, mem).asFlow(), parameters)
assertParameters.run()
}

private suspend fun <T> Flow<T>.chunks(size: Int): Flow<List<T>> = flow {
val chunk = ArrayList<T>(size)
collect {
Expand Down
12 changes: 6 additions & 6 deletions client-legacy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ FluxClient fluxClient = FluxClientFactory.create("http://localhost:8086?readTime
```
The following options are supported:

| Property name | default | description |
| --------------|-------------|-------------|
| readTimeout | 10000 ms| read timeout |
| writeTimeout | 10000 ms| write timeout |
| connectTimeout | 10000 ms| socket timeout |
| logLevel | NONE | rest client verbosity level |
| Property name | default | description |
|----------------|----------|-----------------------------|
| readTimeout | 10000 ms | read timeout |
| writeTimeout | 10000 ms | write timeout |
| connectTimeout | 10000 ms | socket timeout |
| logLevel | NONE | rest client verbosity level |

## Query using the Flux language

Expand Down
40 changes: 21 additions & 19 deletions client-reactive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,17 @@ A client can be configured via configuration file. The configuration file has to

The following options are supported:

| Property name | default | description |
| --------------------------|-----------|-------------|
| influx2.url | - | the url to connect to InfluxDB |
| influx2.org | - | default destination organization for writes and queries |
| influx2.bucket | - | default destination bucket for writes |
| influx2.token | - | the token to use for the authorization |
| influx2.logLevel | NONE | rest client verbosity level |
| influx2.readTimeout | 10000 ms | read timeout |
| influx2.writeTimeout | 10000 ms | write timeout |
| influx2.connectTimeout | 10000 ms | socket timeout |
| Property name | default | description |
|--------------------------|------------|------------------------------------------------------------|
| influx2.url | - | the url to connect to InfluxDB |
| influx2.org | - | default destination organization for writes and queries |
| influx2.bucket | - | default destination bucket for writes |
| influx2.token | - | the token to use for the authorization |
| influx2.logLevel | NONE | rest client verbosity level |
| influx2.readTimeout | 10000 ms | read timeout |
| influx2.writeTimeout | 10000 ms | write timeout |
| influx2.connectTimeout | 10000 ms | socket timeout |
| influx2.precision | NS | default precision for unix timestamps in the line protocol |

The `influx2.readTimeout`, `influx2.writeTimeout` and `influx2.connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.

Expand Down Expand Up @@ -345,15 +346,16 @@ InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory
```
The following options are supported:

| Property name | default | description |
| ------------------|-----------|-------------|
| org | - | default destination organization for writes and queries |
| bucket | - | default destination bucket for writes |
| token | - | the token to use for the authorization |
| logLevel | NONE | rest client verbosity level |
| readTimeout | 10000 ms | read timeout |
| writeTimeout | 10000 ms | write timeout |
| connectTimeout | 10000 ms | socket timeout |
| Property name | default | description |
|------------------|------------|------------------------------------------------------------|
| org | - | default destination organization for writes and queries |
| bucket | - | default destination bucket for writes |
| token | - | the token to use for the authorization |
| logLevel | NONE | rest client verbosity level |
| readTimeout | 10000 ms | read timeout |
| writeTimeout | 10000 ms | write timeout |
| connectTimeout | 10000 ms | socket timeout |
| precision | NS | default precision for unix timestamps in the line protocol |

The `readTimeout`, `writeTimeout` and `connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.

Expand Down
Loading