From beec0d02a1d1342f8373f90904e49233e9ab0ce8 Mon Sep 17 00:00:00 2001 From: pbernet Date: Sat, 14 Sep 2024 09:37:15 +0200 Subject: [PATCH] Update --- .../scala/sample/stream/FlightDelayStreaming.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/scala/sample/stream/FlightDelayStreaming.scala b/src/main/scala/sample/stream/FlightDelayStreaming.scala index 14325100..d921d26b 100644 --- a/src/main/scala/sample/stream/FlightDelayStreaming.scala +++ b/src/main/scala/sample/stream/FlightDelayStreaming.scala @@ -33,10 +33,9 @@ import scala.util.{Failure, Success, Try} * * Typical results on 2012 vintage MacBook Pro with 8 cores * with default dispatcher and default JVM param: - * JDK 11.0.11 41 seconds - * JDK 17.0.2 33 seconds - * JDK 21.0.3 27 seconds - * graalvm-jdk-21 25 seconds + * temurin-11 31 seconds + * temurin-21 21 seconds + * graalvm-jdk-21 18 seconds * * Doc: * https://fullgc.github.io/how-to-tune-akka-to-get-the-most-from-your-actor-based-system-part-1 @@ -48,7 +47,7 @@ object FlightDelayStreaming extends App { import system.dispatcher - val sourceOfLines = FileIO.fromPath(Paths.get("src/main/resources/2008.csv")) + val sourceOfLines = FileIO.fromPath(Paths.get("src/main/resources/2008_subset.csv")) .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 1024, allowTruncation = true) .map(_.utf8String)) @@ -66,7 +65,7 @@ object FlightDelayStreaming extends App { val filterAndConvert: Flow[FlightEvent, FlightDelayRecord, NotUsed] = Flow[FlightEvent] .filter(r => Try(r.arrDelayMins.toInt).getOrElse(-1) > 0) // convert arrival delays to ints, filter out non delays - .mapAsyncUnordered(parallelism = 2) { r => + .mapAsyncUnordered(parallelism = 100_000) { r => Future(FlightDelayRecord(r.year, r.month, r.dayOfMonth, r.flightNum, r.uniqueCarrier, r.arrDelayMins)) } @@ -74,7 +73,7 @@ object FlightDelayStreaming extends App { val averageCarrierDelay: Flow[FlightDelayRecord, FlightDelayAggregate, NotUsed] = Flow[FlightDelayRecord] // maxSubstreams must be larger than the number of UniqueCarrier in the file - .groupBy(30, _.uniqueCarrier, allowClosedSubstreamRecreation = true) + .groupBy(100, _.uniqueCarrier, allowClosedSubstreamRecreation = true) //.wireTap(each => println(s"Processing FlightDelayRecord: $each")) .fold(FlightDelayAggregate("", 0, 0)) { (x: FlightDelayAggregate, y: FlightDelayRecord) =>