Skip to content

Commit

Permalink
Added synchronous blocking API to Write time-series data into InfluxD…
Browse files Browse the repository at this point in the history
…B 2.0 (#42)

* #41: Added synchronous blocking API to Write time-series data into InfluxDB 2.0
  • Loading branch information
bednar authored Jul 16, 2019
1 parent e8ad529 commit ea99660
Show file tree
Hide file tree
Showing 15 changed files with 858 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
1. [#34](https://github.com/bonitoo-io/influxdb-client-java/issues/34): Auto-configure client from configuration file
1. [#35](https://github.com/bonitoo-io/influxdb-client-java/issues/35): Possibility to specify default tags
1. [#41](https://github.com/bonitoo-io/influxdb-client-java/issues/41): Synchronous blocking API to Write time-series data into InfluxDB 2.0

### CI
1. [#37](https://github.com/bonitoo-io/influxdb-client-java/issues/37): Switch CI from oraclejdk to openjdk
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ dependencies {
package example;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;

import org.influxdata.annotations.Column;
Expand All @@ -139,6 +138,7 @@ import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.InfluxDBClientFactory;
import org.influxdata.client.QueryApi;
import org.influxdata.client.WriteApi;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.write.Point;
import org.influxdata.query.FluxRecord;
import org.influxdata.query.FluxTable;
Expand All @@ -162,14 +162,14 @@ public class InfluxDB2Example {
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), ChronoUnit.NANOS);
.time(Instant.now().toEpochMilli(), WritePrecision.NS);

writeApi.writePoint("bucket_name", "org_id", point);

//
// Write by LineProtocol
//
writeApi.writeRecord("bucket_name", "org_id", ChronoUnit.NANOS, "temperature,location=north value=60.0");
writeApi.writeRecord("bucket_name", "org_id", WritePrecision.NS, "temperature,location=north value=60.0");

//
// Write by POJO
Expand All @@ -179,7 +179,7 @@ public class InfluxDB2Example {
temperature.value = 62D;
temperature.time = Instant.now();

writeApi.writeMeasurement("bucket_name", "org_id", ChronoUnit.NANOS, temperature);
writeApi.writeMeasurement("bucket_name", "org_id", WritePrecision.NS, temperature);
}

//
Expand Down
4 changes: 2 additions & 2 deletions client-reactive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ The following example demonstrates how to write measurements every 10 seconds:
package example;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import org.influxdata.annotations.Column;
import org.influxdata.annotations.Measurement;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.reactive.InfluxDBClientReactive;
import org.influxdata.client.reactive.InfluxDBClientReactiveFactory;
import org.influxdata.client.reactive.WriteReactiveApi;
Expand Down Expand Up @@ -226,7 +226,7 @@ public class WriteEvery10Seconds {
return temperature;
});

writeApi.writeMeasurements("bucket_name", "org_id", ChronoUnit.NANOS, measurements);
writeApi.writeMeasurements("bucket_name", "org_id", WritePrecision.NS, measurements);

writeApi.close();
influxDBClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void writePoints(@Nonnull final String bucket,
Arguments.checkNonEmpty(orgID, "organization");
Arguments.checkNotNull(points, "points");

Flowable<BatchWriteDataPoint> stream = points.filter(Objects::nonNull).map(BatchWriteDataPoint::new);
Flowable<BatchWriteDataPoint> stream = points.filter(Objects::nonNull)
.map(point -> new BatchWriteDataPoint(point, options));

write(bucket, orgID, stream);
}
Expand Down Expand Up @@ -158,7 +159,8 @@ public <M> void writeMeasurements(@Nonnull final String bucket,
Arguments.checkNotNull(precision, "precision");
Arguments.checkNotNull(measurements, "measurements");

Flowable<BatchWriteData> stream = measurements.map(it -> new BatchWriteDataMeasurement(it, precision));
Flowable<BatchWriteData> stream = measurements
.map(it -> new BatchWriteDataMeasurement(it, precision, options, measurementMapper));

write(bucket, orgID, precision, stream);
}
Expand Down
98 changes: 90 additions & 8 deletions client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The reference Java client that allows query, write and management (bucket, organ
- authorizations
- health check
- [Advanced Usage](#advanced-usage)
- [Writing data using synchronous blocking API](#writing-data-using-synchronous-blocking-api)
- [Client configuration file](#client-configuration-file)
- [Client connection string](#client-connection-string)
- [Gzip support](#gzip-support)
Expand Down Expand Up @@ -319,7 +320,7 @@ public class RawQueryAsynchronous {

## Writes

For writing data we use [WriteApi](https://bonitoo-io.github.io/influxdb-client-java/influxdb-client-java/apidocs/org/influxdata/client/WriteApi.html) that supports:
For writing data we use [WriteApi](https://bonitoo-io.github.io/influxdb-client-java/influxdb-client-java/apidocs/org/influxdata/client/WriteApi.html) that is an asynchronous non-blocking API and supports:

1. writing data using [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/), Data Point, POJO
2. use batching for writes
Expand Down Expand Up @@ -370,6 +371,8 @@ writeApi.listenEvents(BackpressureEvent.class, value -> {
});
```

There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).

### Writing data

#### By POJO
Expand All @@ -380,13 +383,13 @@ Write Measurement into specified bucket:
package example;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import org.influxdata.annotations.Column;
import org.influxdata.annotations.Measurement;
import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.InfluxDBClientFactory;
import org.influxdata.client.WriteApi;
import org.influxdata.client.domain.WritePrecision;

public class WritePOJO {

Expand All @@ -409,7 +412,7 @@ public class WritePOJO {
temperature.value = 62D;
temperature.time = Instant.now();

writeApi.writeMeasurement("bucket_name", "org_id", ChronoUnit.NANOS, temperature);
writeApi.writeMeasurement("bucket_name", "org_id", WritePrecision.NS, temperature);
}

influxDBClient.close();
Expand Down Expand Up @@ -438,11 +441,11 @@ Write Data point into specified bucket:
package example;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.InfluxDBClientFactory;
import org.influxdata.client.WriteApi;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.write.Point;

public class WriteDataPoint {
Expand All @@ -464,7 +467,7 @@ public class WriteDataPoint {
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), ChronoUnit.NANOS);
.time(Instant.now().toEpochMilli(), WritePrecision.NS);

writeApi.writePoint("bucket_name", "org_id", point);
}
Expand All @@ -481,11 +484,10 @@ Write Line Protocol record into specified bucket:
```java
package example;

import java.time.temporal.ChronoUnit;

import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.InfluxDBClientFactory;
import org.influxdata.client.WriteApi;
import org.influxdata.client.domain.WritePrecision;

public class WriteLineProtocol {

Expand All @@ -505,7 +507,7 @@ public class WriteLineProtocol {
//
String record = "temperature,location=north value=60.0";

writeApi.writeRecord("bucket_name", "org_id", ChronoUnit.NANOS, record);
writeApi.writeRecord("bucket_name", "org_id", WritePrecision.NS, record);
}

influxDBClient.close();
Expand Down Expand Up @@ -667,6 +669,86 @@ public class InfluxDB2ManagementExample {

## Advanced Usage

### Writing data using synchronous blocking API

The [WriteApiBlocking](https://bonitoo-io.github.io/influxdb-client-java/influxdb-client-java/apidocs/org/influxdata/client/WriteApiBlocking.html) provides a synchronous blocking API to writing data using [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/), Data Point and POJO.

_It's up to user to handle a server or a http exception._

```java
package example;

import java.time.Instant;

import org.influxdata.annotations.Column;
import org.influxdata.annotations.Measurement;
import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.InfluxDBClientFactory;
import org.influxdata.client.WriteApiBlocking;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.write.Point;
import org.influxdata.exceptions.InfluxException;

public class WriteDataPoint {

private static char[] token = "my_token".toCharArray();

public static void main(final String[] args) throws Exception {

InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:9999", token);

WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();

try {
//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";

writeApi.writeRecord("bucket_name", "org_id", WritePrecision.NS, record);

//
// Write by Data Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), WritePrecision.NS);

writeApi.writePoint("bucket_name", "org_id", point);

//
// Write by POJO
//
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();

writeApi.writeMeasurement("bucket_name", "org_id", WritePrecision.NS, temperature);

} catch (InfluxException ie) {
System.out.println("InfluxException: " + ie);
}

influxDBClient.close();
}

@Measurement(name = "temperature")
private static class Temperature {

@Column(tag = true)
String location;

@Column
Double value;

@Column(timestamp = true)
Instant time;
}
}
```

### Client configuration file

A client can be configured via configuration file. The configuration file has to be named as `influx2.properties` and has to be in root of classpath.
Expand Down
11 changes: 9 additions & 2 deletions client/src/main/java/org/influxdata/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,29 @@ public interface InfluxDBClient extends AutoCloseable {
QueryApi getQueryApi();

/**
* Get the Write client.
* Get the asynchronous non-blocking Write client.
*
* @return the new client instance for the Write API
*/
@Nonnull
WriteApi getWriteApi();

/**
* Get the Write client.
* Get the asynchronous non-blocking Write client.
*
* @param writeOptions the writes configuration
* @return the new client instance for the Write API
*/
@Nonnull
WriteApi getWriteApi(@Nonnull final WriteOptions writeOptions);

/**
* Get the synchronous blocking Write client.
*
* @return the new client instance for the Write API
*/
@Nonnull
WriteApiBlocking getWriteApiBlocking();

/**
* Get the {@link Authorization} client.
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/org/influxdata/client/WriteApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.influxdata.client.write.events.WriteSuccessEvent;

/**
* Write time-series data into InfluxDB 2.0.
* The asynchronous non-blocking API to Write time-series data into InfluxDB 2.0.
* <p>
* The data are formatted in <a href="https://bit.ly/2QL99fu">Line Protocol</a>.
* <p>
Expand Down
Loading

0 comments on commit ea99660

Please sign in to comment.