From 6cec935ec9284466f4aea95c735c70e95671f80d Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 10 May 2022 14:19:10 +0200 Subject: [PATCH 01/10] feat(scala): Add WriteAPI for Scala --- .../client/scala/InfluxDBClientScala.scala | 7 ++ .../influxdb/client/scala/WriteScalaApi.scala | 65 +++++++++++++ .../internal/InfluxDBClientScalaImpl.scala | 12 ++- .../scala/internal/WriteScalaApiImpl.scala | 94 +++++++++++++++++++ .../client/scala/ITInfluxDBClientScala.scala | 18 ++++ .../client/scala/ITQueryScalaApiQuery.scala | 11 ++- .../scala/InfluxDBClientScalaTest.scala | 2 +- 7 files changed, 201 insertions(+), 8 deletions(-) create mode 100644 client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala create mode 100644 client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala b/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala index 33af1e0414d..6df0f8f6d75 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala @@ -39,6 +39,13 @@ trait InfluxDBClientScala { */ @Nonnull def getQueryScalaApi(): QueryScalaApi + /** + * Create a new WriteApi client. + * + * @return the new client instance for the Write API + */ + @Nonnull def getWriteScalaApi: WriteScalaApi + /** * Get the health of an instance. * diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala new file mode 100644 index 00000000000..dcd48b8b37c --- /dev/null +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -0,0 +1,65 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala + +import akka.Done +import akka.stream.scaladsl.Sink +import com.influxdb.client.domain.WritePrecision + +import scala.concurrent.Future + +/** + * The Scala API to write time-series data into InfluxDB 2.x. + * + * @author Jakub Bednar (bednar@github) (05/09/2022 09:48) + */ +trait WriteScalaApi { + /** + * Write Line Protocol record into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. + */ + def writeRecord(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[String, Future[Done]] + + /** + * Write Line Protocol records into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the records specified in InfluxDB Line Protocol. + */ + def writeRecords(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[String], Future[Done]] +} diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala index 73f59a20e57..4cba7937e1c 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala @@ -25,8 +25,8 @@ import com.influxdb.LogLevel import com.influxdb.client.InfluxDBClientOptions import com.influxdb.client.domain.HealthCheck import com.influxdb.client.internal.AbstractInfluxDBClient -import com.influxdb.client.scala.{InfluxDBClientScala, QueryScalaApi} -import com.influxdb.client.service.QueryService +import com.influxdb.client.scala.{InfluxDBClientScala, QueryScalaApi, WriteScalaApi} +import com.influxdb.client.service.{QueryService, WriteService} import javax.annotation.Nonnull @@ -41,6 +41,14 @@ class InfluxDBClientScalaImpl(@Nonnull options: InfluxDBClientOptions) extends A */ override def getQueryScalaApi(): QueryScalaApi = new QueryScalaApiImpl(retrofit.create(classOf[QueryService]), options) + + /** + * Create a new WriteApi client. + * + * @return the new client instance for the Write API + */ + override def getWriteScalaApi: WriteScalaApi = new WriteScalaApiImpl(retrofit.create(classOf[WriteService]), options) + /** * Get the health of an instance. * diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala new file mode 100644 index 00000000000..a3f82a43242 --- /dev/null +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -0,0 +1,94 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala.internal + +import akka.Done +import akka.stream.scaladsl.{Flow, Keep, Sink} +import com.influxdb.client.InfluxDBClientOptions +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient} +import com.influxdb.client.scala.WriteScalaApi +import com.influxdb.client.service.WriteService +import com.influxdb.client.write.WriteParameters + +import javax.annotation.Nonnull +import scala.concurrent.Future +import scala.jdk.CollectionConverters._ + +/** + * @author Jakub Bednar (bednar@github) (05/09/2022 09:48) + */ +class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: InfluxDBClientOptions) + + extends AbstractWriteBlockingClient(service, options) with WriteScalaApi { + + /** + * Write Line Protocol record into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. + */ + override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = { + + Flow[String] + .map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record))) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Line Protocol records into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the records specified in InfluxDB Line Protocol. + */ + override def writeRecords(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[String], Future[Done]] = { + Flow[Seq[String]] + .map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record))) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) + } + + private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + + //TODO test check exception + //TODO add variant with WriteParameters + //TODO not working for records + write(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch.toList.asJava.stream()) + + Done.done() + } +} diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala b/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala index 2aae0ac320f..5d806a84744 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala @@ -21,15 +21,22 @@ */ package com.influxdb.client.scala +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Source} import com.influxdb.LogLevel import com.influxdb.client.domain.HealthCheck import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration.Duration + /** * @author Jakub Bednar (bednar@github) (06/11/2018 09:52) */ class ITInfluxDBClientScala extends AbstractITQueryScalaApi with Matchers { + implicit val system: ActorSystem = ActorSystem("unit-tests") + before { setUp() } @@ -112,4 +119,15 @@ class ITInfluxDBClientScala extends AbstractITQueryScalaApi with Matchers { influxDBClient.disableGzip() influxDBClient.isGzipEnabled should be(false) } + + test("prototype write api") { + influxDBClient.close() + influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, "my-token".toCharArray) + influxDBClient.setLogLevel(LogLevel.BODY) + val source = Source.single("m2m,tag=a value=1i") + val sink = influxDBClient.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + } } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala index f5dfc2400a1..288f8e70a60 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala @@ -22,7 +22,7 @@ package com.influxdb.client.scala import akka.actor.ActorSystem -import akka.stream.scaladsl.FileIO +import akka.stream.scaladsl.{FileIO, Keep, Source} import akka.stream.testkit.scaladsl.TestSink import akka.util.ByteString import com.influxdb.annotations.Column @@ -98,13 +98,14 @@ class ITQueryScalaApiQuery extends AbstractITQueryScalaApi with Matchers { "cpu,host=A,hyper-threading=true,region=west usage_system=55i,user_usage=65i 20000000000") .mkString("\n") - val writeApi = client.getWriteApiBlocking - writeApi.writeRecord(bucket.getName, organization.getId, WritePrecision.NS, records) - client.close() influxDBClient.close() - influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, token.toCharArray) + influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, token.toCharArray, organization.getId, bucket.getName) + val sink = influxDBClient.getWriteScalaApi.writeRecord() + val future = Source.single(records).toMat(sink)(Keep.right) + Await.ready(future.run(), Duration.Inf) + queryScalaApi = influxDBClient.getQueryScalaApi() } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala index 369becbfb17..483262f5c08 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala @@ -24,9 +24,9 @@ package com.influxdb.client.scala import akka.actor.ActorSystem import akka.stream.testkit.scaladsl.TestSink import com.influxdb.query.FluxRecord +import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.BeforeAndAfter /** * @author Jakub Bednar (09/06/2020 07:19) From 3c6b5f42e6f3102e2d4c782abb30b5f72d726265 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 10 May 2022 14:53:20 +0200 Subject: [PATCH 02/10] chore: add tests for record/records --- .../influxdb/client/scala/WriteScalaApi.scala | 2 +- .../scala/internal/WriteScalaApiImpl.scala | 1 - .../client/scala/ITInfluxDBClientScala.scala | 18 ----- .../scala/InfluxDBClientScalaTest.scala | 4 +- .../influxdb/client/scala/InfluxDBUtils.scala | 2 + .../client/scala/WriteScalaApiTest.scala | 76 +++++++++++++++++++ 6 files changed, 81 insertions(+), 22 deletions(-) create mode 100644 client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala index dcd48b8b37c..a204ba43cb7 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -59,7 +59,7 @@ trait WriteScalaApi { * @param org Specifies the destination organization for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` * if the `org` is not specified. - * @return the sink that accept the records specified in InfluxDB Line Protocol. + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. */ def writeRecords(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[String], Future[Done]] } diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index a3f82a43242..545c0a1df81 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -86,7 +86,6 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx //TODO test check exception //TODO add variant with WriteParameters - //TODO not working for records write(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch.toList.asJava.stream()) Done.done() diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala b/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala index 5d806a84744..2aae0ac320f 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/ITInfluxDBClientScala.scala @@ -21,22 +21,15 @@ */ package com.influxdb.client.scala -import akka.actor.ActorSystem -import akka.stream.scaladsl.{Keep, Source} import com.influxdb.LogLevel import com.influxdb.client.domain.HealthCheck import org.scalatest.matchers.should.Matchers -import scala.concurrent.Await -import scala.concurrent.duration.Duration - /** * @author Jakub Bednar (bednar@github) (06/11/2018 09:52) */ class ITInfluxDBClientScala extends AbstractITQueryScalaApi with Matchers { - implicit val system: ActorSystem = ActorSystem("unit-tests") - before { setUp() } @@ -119,15 +112,4 @@ class ITInfluxDBClientScala extends AbstractITQueryScalaApi with Matchers { influxDBClient.disableGzip() influxDBClient.isGzipEnabled should be(false) } - - test("prototype write api") { - influxDBClient.close() - influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, "my-token".toCharArray) - influxDBClient.setLogLevel(LogLevel.BODY) - val source = Source.single("m2m,tag=a value=1i") - val sink = influxDBClient.getWriteScalaApi.writeRecord() - val materialized = source.toMat(sink)(Keep.right) - - Await.ready(materialized.run(), Duration.Inf) - } } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala index 483262f5c08..7192924a6d3 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala @@ -31,10 +31,10 @@ import org.scalatest.matchers.should.Matchers /** * @author Jakub Bednar (09/06/2020 07:19) */ -class InfluxDBClientScalaTest extends AnyFunSuite with Matchers with BeforeAndAfter{ +class InfluxDBClientScalaTest extends AnyFunSuite with Matchers with BeforeAndAfter { implicit val system: ActorSystem = ActorSystem("unit-tests") - + var utils: InfluxDBUtils = _ before { diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala index 1fe743f3b2b..0771b5d3982 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala @@ -39,6 +39,8 @@ class InfluxDBUtils extends AbstractMockServerTest { def serverTakeRequest(): RecordedRequest = super.takeRequest() + def getRequestCount: Int = mockServer.getRequestCount + override def generateName(prefix: String): String = super.generateName(prefix) override def getDeclaredField[V](obj: Any, field: String, `type`: Class[_]): V = super.getDeclaredField(obj, field, `type`) diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala new file mode 100644 index 00000000000..6cde410cedd --- /dev/null +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -0,0 +1,76 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Source} +import org.scalatest.BeforeAndAfter +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter { + + implicit val system: ActorSystem = ActorSystem("unit-tests") + + var utils: InfluxDBUtils = _ + var client: InfluxDBClientScala = _ + + before { + utils = new InfluxDBUtils {} + client = InfluxDBClientScalaFactory.create(utils.serverStart) + } + + after { + utils.serverStop() + } + + test("write record") { + + utils.serverMockResponse() + + val source = Source.single("m2m,tag=a value=1i") + val sink = client.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("m2m,tag=a value=1i") + } + + test("write records") { + + utils.serverMockResponse() + + val source = Source.single(Seq("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2")) + val sink = client.getWriteScalaApi.writeRecords() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("m2m,tag=a value=1i 1\nm2m,tag=a value=2i 2") + } +} From 2d74581d047d5068856b42d38c089e87a4f90be1 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 10 May 2022 14:55:15 +0200 Subject: [PATCH 03/10] chore: optimize import data for test --- .../scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala index 288f8e70a60..54723b0318c 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala @@ -96,13 +96,12 @@ class ITQueryScalaApiQuery extends AbstractITQueryScalaApi with Matchers { "cpu,host=A,region=west usage_system=35i,user_usage=45i 10000000000", "cpu,host=A,region=west usage_system=38i,user_usage=49i 20000000000", "cpu,host=A,hyper-threading=true,region=west usage_system=55i,user_usage=65i 20000000000") - .mkString("\n") client.close() influxDBClient.close() influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, token.toCharArray, organization.getId, bucket.getName) - val sink = influxDBClient.getWriteScalaApi.writeRecord() + val sink = influxDBClient.getWriteScalaApi.writeRecords() val future = Source.single(records).toMat(sink)(Keep.right) Await.ready(future.run(), Duration.Inf) From 6573198a51f8fcca834a048650de82242af20784 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 11 May 2022 08:31:13 +0200 Subject: [PATCH 04/10] chore: check error propagation, add variant with WriteParameters --- .../influxdb/client/scala/WriteScalaApi.scala | 10 ++++ .../scala/internal/WriteScalaApiImpl.scala | 22 +++++++-- .../influxdb/client/scala/InfluxDBUtils.scala | 2 + .../client/scala/WriteScalaApiTest.scala | 46 +++++++++++++++++-- 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala index a204ba43cb7..f86c0a4e47d 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -24,7 +24,9 @@ package com.influxdb.client.scala import akka.Done import akka.stream.scaladsl.Sink import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.write.WriteParameters +import javax.annotation.Nonnull import scala.concurrent.Future /** @@ -62,4 +64,12 @@ trait WriteScalaApi { * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. */ def writeRecords(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[String], Future[Done]] + + /** + * Write Line Protocol records into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + */ + def writeRecords(@Nonnull parameters: WriteParameters): Sink[Seq[String], Future[Done]] } diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index 545c0a1df81..218854acbea 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -76,17 +76,31 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @return the sink that accept the records specified in InfluxDB Line Protocol. */ override def writeRecords(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[String], Future[Done]] = { + writeRecords(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null)) + } + + /** + * Write Line Protocol records into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + */ + override def writeRecords(parameters: WriteParameters): Sink[Seq[String], Future[Done]] = { Flow[Seq[String]] .map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record))) - .map(batch => writeHttp(precision, bucket, org, batch)) + .map(batch => writeHttp(parameters, batch)) .toMat(Sink.head)(Keep.right) } private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + writeHttp(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch) + } + + private def writeHttp(parameters: WriteParameters, batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + + parameters.check(options) - //TODO test check exception - //TODO add variant with WriteParameters - write(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch.toList.asJava.stream()) + write(parameters, batch.toList.asJava.stream()) Done.done() } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala index 0771b5d3982..bda83e4d0b7 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala @@ -37,6 +37,8 @@ class InfluxDBUtils extends AbstractMockServerTest { def serverMockResponse(): Unit = super.enqueuedResponse() + def serverMockErrorResponse(influxError: String): Unit = mockServer.enqueue(super.createErrorResponse(influxError)) + def serverTakeRequest(): RecordedRequest = super.takeRequest() def getRequestCount: Int = mockServer.getRequestCount diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala index 6cde410cedd..a41519c8794 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -23,14 +23,18 @@ package com.influxdb.client.scala import akka.actor.ActorSystem import akka.stream.scaladsl.{Keep, Source} +import com.influxdb.client.write.WriteParameters +import com.influxdb.exceptions.InternalServerErrorException import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.ScalaFutures import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import scala.concurrent.Await import scala.concurrent.duration.Duration +import scala.language.postfixOps -class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter { +class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter with ScalaFutures { implicit val system: ActorSystem = ActorSystem("unit-tests") @@ -39,7 +43,7 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter { before { utils = new InfluxDBUtils {} - client = InfluxDBClientScalaFactory.create(utils.serverStart) + client = InfluxDBClientScalaFactory.create(utils.serverStart, "my-token".toCharArray, "my-org", "my-bucket") } after { @@ -57,7 +61,12 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter { Await.ready(materialized.run(), Duration.Inf) utils.getRequestCount should be(1) - utils.serverTakeRequest().getBody.readUtf8() should be("m2m,tag=a value=1i") + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("m2m,tag=a value=1i") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") } test("write records") { @@ -73,4 +82,35 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter { utils.getRequestCount should be(1) utils.serverTakeRequest().getBody.readUtf8() should be("m2m,tag=a value=1i 1\nm2m,tag=a value=2i 2") } + + test("write records custom params") { + + utils.serverMockResponse() + + val source = Source.single("m2m,tag=a value=1i 1").map(it => Seq(it)) + val sink = client.getWriteScalaApi.writeRecords(new WriteParameters("my-bucket-2", null, null, null)) + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + request.getBody.readUtf8() should be("m2m,tag=a value=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket-2") + } + + test("write records error propagation") { + + utils.serverMockErrorResponse("line protocol poorly formed and no points were written") + + val source = Source.single(Seq("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2")) + val sink = client.getWriteScalaApi.writeRecords() + val materialized = source.toMat(sink)(Keep.right) + + whenReady(materialized.run().failed) { exc => { + exc.getMessage should be("line protocol poorly formed and no points were written") + exc.getClass should be(classOf[InternalServerErrorException]) + } + } + } } From 75d4743666c7030eb3b928e7b57dd0d6dddf3559 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 May 2022 09:24:04 +0200 Subject: [PATCH 05/10] feat: add write Points --- .../influxdb/client/scala/WriteScalaApi.scala | 40 ++++++++- .../scala/internal/WriteScalaApiImpl.scala | 58 ++++++++++++- .../client/scala/WriteScalaApiTest.scala | 82 ++++++++++++++++++- 3 files changed, 177 insertions(+), 3 deletions(-) diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala index f86c0a4e47d..39d895448ca 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -24,7 +24,7 @@ package com.influxdb.client.scala import akka.Done import akka.stream.scaladsl.Sink import com.influxdb.client.domain.WritePrecision -import com.influxdb.client.write.WriteParameters +import com.influxdb.client.write.{Point, WriteParameters} import javax.annotation.Nonnull import scala.concurrent.Future @@ -72,4 +72,42 @@ trait WriteScalaApi { * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. */ def writeRecords(@Nonnull parameters: WriteParameters): Sink[Seq[String], Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `point` is considered as one batch unit. + */ + def writePoint(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Point, Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + def writePoints(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[Point], Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + def writePoints(@Nonnull parameters: WriteParameters): Sink[Seq[Point], Future[Done]] } diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index 218854acbea..a0733a03d5e 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -28,9 +28,10 @@ import com.influxdb.client.domain.WritePrecision import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient} import com.influxdb.client.scala.WriteScalaApi import com.influxdb.client.service.WriteService -import com.influxdb.client.write.WriteParameters +import com.influxdb.client.write.{Point, WriteParameters} import javax.annotation.Nonnull +import scala.collection.immutable.ListMap import scala.concurrent.Future import scala.jdk.CollectionConverters._ @@ -92,6 +93,61 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx .toMat(Sink.head)(Keep.right) } + /** + * Write Data points into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `point` is considered as one batch unit. + */ + override def writePoint(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { + Flow[Point] + .map(point => Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options))) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Data points into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + override def writePoints(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { + writePoints(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null)) + } + + /** + * Write Data points into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + override def writePoints(parameters: WriteParameters): Sink[Seq[Point], Future[Done]] = { + Flow[Seq[Point]] + // create ordered Map + .map(records => records.foldRight(ListMap.empty[WritePrecision, Seq[Point]]) { + case (point, map) => map.updated(point.getPrecision, point +: map.getOrElse(point.getPrecision, Seq())) + }.toList.reverse) + .map(grouped => grouped.map(group => (group._1, group._2.map(point => new AbstractWriteClient.BatchWriteDataPoint(point, options))))) + .map(batches => batches.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2))) + .map(_ => Done.done()) + .toMat(Sink.head)(Keep.right) + } + private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { writeHttp(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch) } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala index a41519c8794..43a237de748 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -23,7 +23,8 @@ package com.influxdb.client.scala import akka.actor.ActorSystem import akka.stream.scaladsl.{Keep, Source} -import com.influxdb.client.write.WriteParameters +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.write.{Point, WriteParameters} import com.influxdb.exceptions.InternalServerErrorException import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.ScalaFutures @@ -113,4 +114,83 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi } } } + + test("write point") { + + utils.serverMockResponse() + + val point = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val source = Source.single(point) + val sink = client.getWriteScalaApi.writePoint() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("h2o,location=europe level=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + + test("write points") { + + utils.serverMockResponse() + + val point1 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val point2 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .time(2L, WritePrecision.NS) + + val source = Source.single(Seq(point1, point2)) + val sink = client.getWriteScalaApi.writePoints() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=1i 1\nh2o,location=europe level=2i 2") + } + + test("write points different precision") { + + utils.serverMockResponse() + utils.serverMockResponse() + + val point1 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val point2 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .time(2L, WritePrecision.S) + + val source = Source.single(Seq(point1, point2)) + val sink = client.getWriteScalaApi.writePoints() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(2) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=1i 1") + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=2i 2") + } } From 738635e61df28f1d24f71384047e1bd38c4fe658 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 May 2022 09:33:06 +0200 Subject: [PATCH 06/10] feat: add write Points --- .../com/influxdb/client/scala/WriteScalaApi.scala | 8 ++------ .../client/scala/internal/WriteScalaApiImpl.scala | 14 +++++--------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala index 39d895448ca..03355783da3 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -76,8 +76,6 @@ trait WriteScalaApi { /** * Write Data points into specified bucket. * - * @param precision Precision for the unix timestamps within the body line-protocol. - * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. * @param bucket Specifies the destination bucket for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination * `bucket` if the `bucket` is not specified. @@ -86,13 +84,11 @@ trait WriteScalaApi { * if the `org` is not specified. * @return the sink that accept the Data points. The `point` is considered as one batch unit. */ - def writePoint(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Point, Future[Done]] + def writePoint(bucket: Option[String] = None, org: Option[String] = None): Sink[Point, Future[Done]] /** * Write Data points into specified bucket. * - * @param precision Precision for the unix timestamps within the body line-protocol. - * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. * @param bucket Specifies the destination bucket for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination * `bucket` if the `bucket` is not specified. @@ -101,7 +97,7 @@ trait WriteScalaApi { * if the `org` is not specified. * @return the sink that accept the Data points. The `points` are considered as one batch unit. */ - def writePoints(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[Point], Future[Done]] + def writePoints(bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[Point], Future[Done]] /** * Write Data points into specified bucket. diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index a0733a03d5e..206fde70d82 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -96,8 +96,6 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx /** * Write Data points into specified bucket. * - * @param precision Precision for the unix timestamps within the body line-protocol. - * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. * @param bucket Specifies the destination bucket for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination * `bucket` if the `bucket` is not specified. @@ -106,18 +104,16 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * if the `org` is not specified. * @return the sink that accept the Data points. The `point` is considered as one batch unit. */ - override def writePoint(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { + override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { Flow[Point] - .map(point => Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options))) - .map(batch => writeHttp(precision, bucket, org, batch)) + .map(point => (point.getPrecision, Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options)))) + .map(batch => writeHttp(Some(batch._1), bucket, org, batch._2)) .toMat(Sink.head)(Keep.right) } /** * Write Data points into specified bucket. * - * @param precision Precision for the unix timestamps within the body line-protocol. - * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. * @param bucket Specifies the destination bucket for writes. * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination * `bucket` if the `bucket` is not specified. @@ -126,8 +122,8 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * if the `org` is not specified. * @return the sink that accept the Data points. The `points` are considered as one batch unit. */ - override def writePoints(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { - writePoints(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null)) + override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { + writePoints(new WriteParameters(bucket.orNull, org.orNull, null, null)) } /** From 27354316298649e91d18597da4243ae2d914f8d3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 May 2022 20:22:19 +0200 Subject: [PATCH 07/10] feat: add write Measurements --- .../influxdb/client/scala/WriteScalaApi.scala | 41 ++++++++ .../scala/internal/WriteScalaApiImpl.scala | 95 +++++++++++++++---- .../client/scala/WriteScalaApiTest.scala | 65 +++++++++++++ 3 files changed, 182 insertions(+), 19 deletions(-) diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala index 03355783da3..daebec45853 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -106,4 +106,45 @@ trait WriteScalaApi { * @return the sink that accept the Data points. The `points` are considered as one batch unit. */ def writePoints(@Nonnull parameters: WriteParameters): Sink[Seq[Point], Future[Done]] + + /** + * Write Measurement into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurement. The `measurement` is considered as one batch unit. + */ + def writeMeasurement[M](precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[M, Future[Done]] + + /** + * Write Measurements into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + def writeMeasurements[M](precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[M], Future[Done]] + + /** + * Write Measurements into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + def writeMeasurements[M](@Nonnull parameters: WriteParameters): Sink[Seq[M], Future[Done]] } diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala index 206fde70d82..cd21bfb434b 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -56,7 +56,6 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. */ override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = { - Flow[String] .map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record))) .map(batch => writeHttp(precision, bucket, org, batch)) @@ -77,7 +76,7 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx * @return the sink that accept the records specified in InfluxDB Line Protocol. */ override def writeRecords(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[String], Future[Done]] = { - writeRecords(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null)) + writeRecords(toWriteParameters(precision, bucket, org)) } /** @@ -96,12 +95,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx /** * Write Data points into specified bucket. * - * @param bucket Specifies the destination bucket for writes. - * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination - * `bucket` if the `bucket` is not specified. - * @param org Specifies the destination organization for writes. - * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` - * if the `org` is not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. * @return the sink that accept the Data points. The `point` is considered as one batch unit. */ override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { @@ -114,12 +113,12 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx /** * Write Data points into specified bucket. * - * @param bucket Specifies the destination bucket for writes. - * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination - * `bucket` if the `bucket` is not specified. - * @param org Specifies the destination organization for writes. - * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` - * if the `org` is not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. * @return the sink that accept the Data points. The `points` are considered as one batch unit. */ override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { @@ -144,16 +143,74 @@ class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: Influx .toMat(Sink.head)(Keep.right) } - private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { - writeHttp(new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null), batch) + /** + * Write Measurement into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurement. The `measurement` is considered as one batch unit. + */ + override def writeMeasurement[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[M, Future[Done]] = { + Flow[M] + .map(measurement => { + val parameters = toWriteParameters(precision, bucket, org) + Seq(toMeasurementBatch(measurement, parameters.precisionSafe(options))) + }) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) } - private def writeHttp(parameters: WriteParameters, batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + /** + * Write Measurements into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + override def writeMeasurements[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[M], Future[Done]] = { + writeMeasurements(toWriteParameters(precision, bucket, org)) + } - parameters.check(options) + /** + * Write Measurements into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + override def writeMeasurements[M](parameters: WriteParameters): Sink[Seq[M], Future[Done]] = { + Flow[Seq[M]] + .map(records => records.map(record => toMeasurementBatch(record, parameters.precisionSafe(options)))) + .map(batch => writeHttp(parameters, batch)) + .toMat(Sink.head)(Keep.right) + } - write(parameters, batch.toList.asJava.stream()) + private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + writeHttp(toWriteParameters(precision, bucket, org), batch) + } + private def writeHttp(parameters: WriteParameters, batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + write(parameters, batch.toList.asJava.stream()) Done.done() } + + private def toWriteParameters(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): WriteParameters = { + val parameters = new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null) + parameters.check(options) + parameters + } } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala index 43a237de748..2b91b914095 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -23,6 +23,7 @@ package com.influxdb.client.scala import akka.actor.ActorSystem import akka.stream.scaladsl.{Keep, Source} +import com.influxdb.annotations.{Column, Measurement} import com.influxdb.client.domain.WritePrecision import com.influxdb.client.write.{Point, WriteParameters} import com.influxdb.exceptions.InternalServerErrorException @@ -31,6 +32,7 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import java.time.Instant import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.language.postfixOps @@ -193,4 +195,67 @@ class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter wi utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=1i 1") utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=2i 2") } + + test("write measurement") { + + utils.serverMockResponse() + + val measurement = new H2O() + measurement.location = "coyote_creek" + measurement.level = 2.927 + measurement.description = "below 3 feet" + measurement.time = Instant.ofEpochMilli(1440046800L) + + val source = Source.single(measurement) + val sink = client.getWriteScalaApi.writeMeasurement() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("h2o,location=coyote_creek level\\ description=\"below 3 feet\",water_level=2.927 1440046800000000") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + + test("write measurements") { + + utils.serverMockResponse() + + val measurement1 = new H2O() + measurement1.location = "coyote_creek" + measurement1.level = 2.927 + measurement1.description = "below 3 feet" + measurement1.time = Instant.ofEpochMilli(1440046800L) + + val measurement2 = new H2O() + measurement2.location = "europe" + measurement2.level = 10 + measurement2.description = "below 3 feet" + measurement2.time = Instant.ofEpochMilli(1440046800L) + + val source = Source.single(Seq(measurement1, measurement2)) + val sink = client.getWriteScalaApi.writeMeasurements() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=coyote_creek level\\ description=\"below 3 feet\",water_level=2.927 1440046800000000\nh2o,location=europe level\\ description=\"below 3 feet\",water_level=10.0 1440046800000000") + } + + @Measurement(name = "h2o") + class H2O() { + @Column(name = "location", tag = true) + var location: String = _ + @Column(name = "water_level") + var level: Double = _ + @Column(name = "level description") + var description: String = _ + @Column(name = "time", timestamp = true) + var time: Instant = _ + } } From 1c4fe58d16c528c294d61de63eafb2a7e2a9a1d3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 12 May 2022 20:26:30 +0200 Subject: [PATCH 08/10] feat: add write Measurements --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdfbcf33175..df0ec1c2d02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features 1. [#337](https://github.com/influxdata/influxdb-client-java/pull/337): Supports `columns` function [FluxDSL] +1. [#347](https://github.com/influxdata/influxdb-client-java/pull/347): Add `Scala` WriteApi ### Bug Fixes 1. [#339](https://github.com/influxdata/influxdb-client-java/pull/339): Evaluation of connection string From 026ba19af8ab65b206dbf635009dac2616e5b418 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 13 May 2022 08:28:08 +0200 Subject: [PATCH 09/10] docs: update PULL_REQUEST_TEMPLATE --- .github/PULL_REQUEST_TEMPLATE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index f47f60b5696..726d7a94ac4 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -12,5 +12,5 @@ _Briefly describe your proposed changes:_ - [ ] Rebased/mergeable - [ ] A test has been added if appropriate - [ ] `mvn test` completes successfully -- [ ] Commit messages are in [semantic format](https://seesparkbox.com/foundry/semantic_commit_messages) +- [ ] Commit messages are [conventional](https://www.conventionalcommits.org/en/v1.0.0/) - [ ] Sign [CLA](https://www.influxdata.com/legal/cla/) (if not already signed) From d2f948a7ee285c9c66c1ea01b9700601f756cb97 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 13 May 2022 09:15:55 +0200 Subject: [PATCH 10/10] docs(examples): add Scala Write Example --- examples/README.md | 11 ++- ...DB2ScalaExample.scala => ScalaQuery.scala} | 4 +- ...laExampleDSL.scala => ScalaQueryDSL.scala} | 6 +- ...laExampleRaw.scala => ScalaQueryRaw.scala} | 4 +- .../src/main/java/example/ScalaWriteApi.scala | 94 +++++++++++++++++++ 5 files changed, 111 insertions(+), 8 deletions(-) rename examples/src/main/java/example/{InfluxDB2ScalaExample.scala => ScalaQuery.scala} (96%) rename examples/src/main/java/example/{InfluxDB2ScalaExampleDSL.scala => ScalaQueryDSL.scala} (94%) rename examples/src/main/java/example/{InfluxDB2ScalaExampleRaw.scala => ScalaQueryRaw.scala} (95%) create mode 100644 examples/src/main/java/example/ScalaWriteApi.scala diff --git a/examples/README.md b/examples/README.md index 59de800d58a..04cbbd91672 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,4 +20,13 @@ This directory contains Java, Kotlin and Scala examples. ### Writes - [KotlinWriteApi.kt](src/main/java/example/KotlinWriteApi.kt) - How to ingest data by `DataPoint`, `LineProtocol` or `Data class` - [KotlinWriteBatchingByFlow.kt](src/main/java/example/KotlinWriteBatchingByFlow.kt) - How to use [Flow](https://kotlinlang.org/docs/flow.html) operators to prepare batches for synchronous write into InfluxDB - \ No newline at end of file + +## Scala + +### Query +- [ScalaQuery.scala](src/main/java/example/ScalaQuery.scala) - How to query data into a stream of `FluxRecord` and filter them by `Flow` operators +- [ScalaQueryRaw.scala](src/main/java/example/ScalaQueryRaw.scala) - How to query data into a stream of `String` +- [ScalaQueryDSL.scala](src/main/java/example/ScalaQueryDSL.scala) - How to use the [FluxDSL](../flux-dsl) to query data + +### Writes +- [ScalaWriteApi.scala](src/main/java/example/ScalaWriteApi.scala) - How to ingest data by `DataPoint`, `LineProtocol` or `POJO` \ No newline at end of file diff --git a/examples/src/main/java/example/InfluxDB2ScalaExample.scala b/examples/src/main/java/example/ScalaQuery.scala similarity index 96% rename from examples/src/main/java/example/InfluxDB2ScalaExample.scala rename to examples/src/main/java/example/ScalaQuery.scala index 079d56cb2b8..db25594a648 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExample.scala +++ b/examples/src/main/java/example/ScalaQuery.scala @@ -29,9 +29,9 @@ import com.influxdb.query.FluxRecord import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExample { +object ScalaQuery { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") def main(args: Array[String]): Unit = { diff --git a/examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala b/examples/src/main/java/example/ScalaQueryDSL.scala similarity index 94% rename from examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala rename to examples/src/main/java/example/ScalaQueryDSL.scala index c744002ef71..7f1dcfa54ca 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala +++ b/examples/src/main/java/example/ScalaQueryDSL.scala @@ -33,11 +33,11 @@ import com.influxdb.query.dsl.functions.restriction.Restrictions import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExampleDSL { +object ScalaQueryDSL { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") - def main(args: Array[String]) { + def main(args: Array[String]): Unit = { val influxDBClient = InfluxDBClientScalaFactory .create("http://localhost:8086", "my-token".toCharArray, "my-org") diff --git a/examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala b/examples/src/main/java/example/ScalaQueryRaw.scala similarity index 95% rename from examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala rename to examples/src/main/java/example/ScalaQueryRaw.scala index db6edd61971..c13ab87bde9 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala +++ b/examples/src/main/java/example/ScalaQueryRaw.scala @@ -28,9 +28,9 @@ import com.influxdb.client.scala.InfluxDBClientScalaFactory import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExampleRaw { +object ScalaQueryRaw { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") def main(args: Array[String]): Unit = { val influxDBClient = InfluxDBClientScalaFactory diff --git a/examples/src/main/java/example/ScalaWriteApi.scala b/examples/src/main/java/example/ScalaWriteApi.scala new file mode 100644 index 00000000000..7b8bbda0e55 --- /dev/null +++ b/examples/src/main/java/example/ScalaWriteApi.scala @@ -0,0 +1,94 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Source} +import com.influxdb.annotations.{Column, Measurement} +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.scala.InfluxDBClientScalaFactory +import com.influxdb.client.write.Point + +import java.time.Instant +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +object ScalaWriteApi { + + implicit val system: ActorSystem = ActorSystem("examples") + + def main(args: Array[String]): Unit = { + + val client = InfluxDBClientScalaFactory.create( + "http://localhost:8086", "my-token".toCharArray, "my-org", "my-bucket") + + // + // Use InfluxDB Line Protocol to write data + // + val record = "mem,host=host1 used_percent=23.43234543" + + val source = Source.single(record) + val sink = client.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + Await.result(materialized.run(), Duration.Inf) + + // + // Use a Data Point to write data + // + val point = Point + .measurement("mem") + .addTag("host", "host1") + .addField("used_percent", 23.43234543) + .time(Instant.now(), WritePrecision.NS) + + val sourcePoint = Source.single(point) + val sinkPoint = client.getWriteScalaApi.writePoint() + val materializedPoint = sourcePoint.toMat(sinkPoint)(Keep.right) + Await.result(materializedPoint.run(), Duration.Inf) + + // + // Use POJO and corresponding class to write data + // + val mem = new Mem() + mem.host = "host1" + mem.used_percent = 23.43234543 + mem.time = Instant.now + + val sourcePOJO = Source.single(mem) + val sinkPOJO = client.getWriteScalaApi.writeMeasurement() + val materializedPOJO = sourcePOJO.toMat(sinkPOJO)(Keep.right) + Await.result(materializedPOJO.run(), Duration.Inf) + + client.close() + system.terminate() + } + + @Measurement(name = "mem") + class Mem() { + @Column(tag = true) + var host: String = _ + @Column + var used_percent: Double = _ + @Column(timestamp = true) + var time: Instant = _ + } +}