Skip to content

Commit

Permalink
#43: The data point without field should be ignored
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Jul 18, 2019
1 parent ea99660 commit fe33618
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
1. [#35](https://github.com/bonitoo-io/influxdb-client-java/issues/35): Possibility to specify default tags
1. [#41](https://github.com/bonitoo-io/influxdb-client-java/issues/41): Synchronous blocking API to Write time-series data into InfluxDB 2.0

### Bugs
1. [#43](https://github.com/bonitoo-io/influxdb-client-java/issues/43): The data point without field should be ignored

### CI
1. [#37](https://github.com/bonitoo-io/influxdb-client-java/issues/37): Switch CI from oraclejdk to openjdk

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ public String toLineProtocol() {

public static final class BatchWriteDataPoint implements BatchWriteData {

private static final Logger LOG = Logger.getLogger(BatchWriteDataPoint.class.getName());

private final Point point;
private final InfluxDBClientOptions options;

Expand All @@ -284,10 +286,17 @@ public BatchWriteDataPoint(@Nonnull final Point point,
this.options = options;
}

@Nonnull
@Nullable
@Override
public String toLineProtocol() {

if (!point.hasFields()) {

LOG.warning("The point: " + point + "doesn't contains any fields, skipping");

return null;
}

return point.toLineProtocol(options.getPointSettings());
}
}
Expand Down Expand Up @@ -317,7 +326,15 @@ public String toLineProtocol() {
return null;
}

return measurementMapper.toPoint(measurement, precision).toLineProtocol(options.getPointSettings());
Point point = measurementMapper.toPoint(measurement, precision);
if (!point.hasFields()) {

LOG.warning("The measurement: " + measurement + "doesn't contains any fields, skipping");

return null;
}

return point.toLineProtocol(options.getPointSettings());
}
}

Expand Down
9 changes: 9 additions & 0 deletions client/src/main/java/org/influxdata/client/write/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ public WritePrecision getPrecision() {
return precision;
}

/**
* Has point any fields?
*
* @return true, if the point contains any fields, false otherwise.
*/
public boolean hasFields() {
return !fields.isEmpty();
}

/**
* @return Line Protocol
*/
Expand Down
42 changes: 42 additions & 0 deletions client/src/test/java/org/influxdata/client/ITWriteApiBlocking.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,48 @@ void write() {
}
}

@Test
void writePointsWithoutFields() {

WriteApiBlocking api = influxDBClient.getWriteApiBlocking();

api.writePoints(Arrays.asList(
Point.measurement("h2o").addTag("location", "coyote_creek").time(9L, WritePrecision.NS),
Point.measurement("h2o").addTag("location", "coyote_creek").addField("water_level", 10.0D).time(10L, WritePrecision.NS)));

List<FluxTable> query = influxDBClient.getQueryApi().query("from(bucket:\"" + bucket.getName() + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", organization.getId());

Assertions.assertThat(query).hasSize(1);
Assertions.assertThat(query.get(0).getRecords()).hasSize(1);

FluxRecord record = query.get(0).getRecords().get(0);
Assertions.assertThat(record.getMeasurement()).isEqualTo("h2o");
Assertions.assertThat(record.getValue()).isEqualTo(10D);
Assertions.assertThat(record.getField()).isEqualTo("water_level");
Assertions.assertThat(record.getTime()).isEqualTo(Instant.ofEpochSecond(0, 10));
}

@Test
void writeMeasurementWithoutFields() {

WriteApiBlocking api = influxDBClient.getWriteApiBlocking();

api.writeMeasurements(WritePrecision.NS, Arrays.asList(
new H2OFeetMeasurement("coyote_creek", null, null, Instant.ofEpochSecond(0, 15)),
new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16))));

List<FluxTable> query = influxDBClient.getQueryApi().query("from(bucket:\"" + bucket.getName() + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", organization.getId());

Assertions.assertThat(query).hasSize(1);
Assertions.assertThat(query.get(0).getRecords()).hasSize(1);

FluxRecord record = query.get(0).getRecords().get(0);
Assertions.assertThat(record.getMeasurement()).isEqualTo("h2o");
Assertions.assertThat(record.getValue()).isEqualTo(16D);
Assertions.assertThat(record.getField()).isEqualTo("water_level");
Assertions.assertThat(record.getTime()).isEqualTo(Instant.ofEpochSecond(0, 16));
}

@Test
void propagateServerException() {

Expand Down
61 changes: 61 additions & 0 deletions client/src/test/java/org/influxdata/client/ITWriteQueryApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,39 @@ void writePoints() {
Assertions.assertThat(fluxRecords.get(1).getValue()).isEqualTo(2L);
}

@Test
void writePointsWithoutFields() {

String bucketName = bucket.getName();

writeApi = influxDBClient.getWriteApi();
WriteEventListener<WriteSuccessEvent> listener = new WriteEventListener<>();
writeApi.listenEvents(WriteSuccessEvent.class, listener);

Instant time = Instant.now();

Point point1 = Point.measurement("h2o_feet").addTag("location", "west").time(time, WritePrecision.MS);
Point point2 = Point.measurement("h2o_feet").addTag("location", "west").addField("water_level", 2).time(time.plusMillis(10), WritePrecision.MS);

writeApi.writePoints(bucketName, organization.getId(), Arrays.asList(point1, point2, point2));

waitToCallback(listener.countDownLatch, 10);

List<FluxRecord> fluxRecords = new ArrayList<>();

CountDownLatch queryCountDown = new CountDownLatch(1);
queryApi.query("from(bucket:\"" + bucketName + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", organization.getId(), (cancellable, fluxRecord) -> {
fluxRecords.add(fluxRecord);
queryCountDown.countDown();

});

waitToCallback(queryCountDown, 10);

Assertions.assertThat(fluxRecords).hasSize(1);
Assertions.assertThat(fluxRecords.get(0).getValue()).isEqualTo(2L);
}

@Test
void writeMeasurement() {

Expand All @@ -293,6 +326,34 @@ void writeMeasurement() {
Assertions.assertThat(measurements.get(0).time).isEqualTo(measurement.time);
}

@Test
void writeMeasurementWithoutFields() {

String bucketName = bucket.getName();

writeApi = influxDBClient.getWriteApi();
WriteEventListener<WriteSuccessEvent> listener = new WriteEventListener<>();
writeApi.listenEvents(WriteSuccessEvent.class, listener);

long millis = Instant.now().toEpochMilli();
H2OFeetMeasurement measurement1 = new H2OFeetMeasurement(
"coyote_creek", 2.927, null, millis);
H2OFeetMeasurement measurement2 = new H2OFeetMeasurement(
"coyote_creek", null, null, millis);

writeApi.writeMeasurements(bucketName, organization.getId(), WritePrecision.NS, Arrays.asList(measurement1, measurement2));

waitToCallback(listener.countDownLatch, 10);

List<H2OFeetMeasurement> measurements = queryApi.query("from(bucket:\"" + bucketName + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last() |> rename(columns:{_value: \"water_level\"})", organization.getId(), H2OFeetMeasurement.class);

Assertions.assertThat(measurements).hasSize(1);
Assertions.assertThat(measurements.get(0).location).isEqualTo("coyote_creek");
Assertions.assertThat(measurements.get(0).description).isNull();
Assertions.assertThat(measurements.get(0).level).isEqualTo(2.927);
Assertions.assertThat(measurements.get(0).time).isEqualTo(measurement1.time);
}

@Test
void queryDataFromNewOrganization() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,13 @@ void defaultTagsSorted() {

Assertions.assertThat(point.toLineProtocol(defaults)).isEqualTo("h2o,a-expensive=true,location=europe,z-expensive=false level=2i");
}

@Test
void hasFields() {

Assertions.assertThat(Point.measurement("h2o").hasFields()).isFalse();
Assertions.assertThat(Point.measurement("h2o").addTag("location", "europe").hasFields()).isFalse();
Assertions.assertThat(Point.measurement("h2o").addField("level", 2).hasFields()).isTrue();
Assertions.assertThat(Point.measurement("h2o").addTag("location", "europe").addField("level", 3).hasFields()).isTrue();
}
}

0 comments on commit fe33618

Please sign in to comment.