Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any file operation stops after one buffer #198

Closed
KoenR3 opened this issue Feb 4, 2021 · 2 comments · Fixed by #199
Closed

Any file operation stops after one buffer #198

KoenR3 opened this issue Feb 4, 2021 · 2 comments · Fixed by #199
Labels
bug Something isn't working
Milestone

Comments

@KoenR3
Copy link

KoenR3 commented Feb 4, 2021

Steps to reproduce:
Behaviour is the same when using Alpakka Parquet writer using the queryRaw or query functionality.

Query listed as an example. Any query that has more points than the buffer should produce the same results

  val influxDBClient: InfluxDBClientScala = InfluxDBClientScalaFactory
    .create(influxOptions, 10000, OverflowStrategy.backpressure)

  val solar_inverter: Flux = Flux
    .from("electricity")
    .range(
      ZonedDateTime
        .of(
          LocalDate.of(2020, 1, 1).atStartOfDay(),
          ZoneId.of("Europe/Brussels")
        )
        .toInstant,
      ZonedDateTime
        .now(
          ZoneId.of("Europe/Brussels")
        )
        .toInstant
    )
    .filter(
      Restrictions.and(
        Restrictions.measurement().equal("solar_inverter_total_power"),
        Restrictions.value().exists()
      )
    )

  val querySource: Source[String, NotUsed] =
    influxDBClient.getQueryScalaApi().queryRaw(solar_inverter.toString())

  val fp = "./test.txt"

  val filePath = Paths.get(fp)

  val count= querySource
    .log("Source")
    .withAttributes(akkaLoggerAttributes)
    .map( s => ByteString(s + "\n"))
     .runWith(FileIO.toPath(filePath))
    .recover(e => {
      logger.error(e.getMessage)
      throw e
    })

  count onComplete {
    case Success(_) =>
      influxDBClient.close()
      system.terminate()
    case Failure(e) =>
      logger.error(e)
      influxDBClient.close()
      system.terminate()
  }

Expected behavior:
All measurements should be streamed into the file

Actual behavior:
When querying a source which has more measurements than the buffer size the stream finishes after approximately one buffer when using FileIO or Alpakka Parquet writer

Specifications:

  • Client Version: 1.15
  • Akka Streams: 2.6.8
  • InfluxDB Version: 1.8.4
  • JDK Version: 1.8
  • Platform: Mac OS
@bednar
Copy link
Contributor

bednar commented Feb 5, 2021

Hi @KoenR3,

thanks for using our client, we will take a look.

Regards

@bednar bednar added the bug Something isn't working label Feb 5, 2021
@bednar
Copy link
Contributor

bednar commented Feb 9, 2021

@KoenR3, I've prepare the PR - #199 with fix.

As a workarounf you could use a following code with influxdb-client-reactive:

package example

import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Source}
import akka.util.ByteString
import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory

import java.nio.file.Paths
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object InfluxDB2ScalaExampleRaw {

  implicit val system: ActorSystem = ActorSystem("examples")

  def main(args: Array[String]): Unit = {

    val influxOptions = InfluxDBClientOptions.builder()
      .url("http://localhost:9999")
      .authenticateToken("my-token".toCharArray)
      .org("my-org")
      .build()

    val influxDBClient = InfluxDBClientReactiveFactory.create(influxOptions)

    val fluxQuery = ("from(bucket: \"h2o1612876000082-IT\")\n"
      + " |> range(start: 0)"
      + " |> filter(fn: (r) => (r[\"_measurement\"] == \"push\"))")


    val value = influxDBClient.getQueryReactiveApi.queryRaw(fluxQuery)
    val result = Source.fromPublisher(value)

    val fp = "./test.txt"
    val filePath = Paths.get(fp)

    val count = result
      .log("Source")
      .map(s => ByteString(s + "\n"))
      .runWith(FileIO.toPath(filePath))
      .recover(e => {
        val message = e.getMessage
        println(s"Line: $message")
        throw e
      })

    count onComplete {
      case Success(_) =>
        influxDBClient.close()
        system.terminate()
      case Failure(e) =>
        val message = e.getMessage
        println(s"Line: $message")
        influxDBClient.close()
        system.terminate()
    }
  }
}

@bednar bednar modified the milestones: 2.1.0, 2.0.0 Mar 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants