Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Sep 14, 2024
1 parent 9f23c18 commit beec0d0
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/main/scala/sample/stream/FlightDelayStreaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import scala.util.{Failure, Success, Try}
*
* Typical results on 2012 vintage MacBook Pro with 8 cores
* with default dispatcher and default JVM param:
* JDK 11.0.11 41 seconds
* JDK 17.0.2 33 seconds
* JDK 21.0.3 27 seconds
* graalvm-jdk-21 25 seconds
* temurin-11 31 seconds
* temurin-21 21 seconds
* graalvm-jdk-21 18 seconds
*
* Doc:
* https://fullgc.github.io/how-to-tune-akka-to-get-the-most-from-your-actor-based-system-part-1
Expand All @@ -48,7 +47,7 @@ object FlightDelayStreaming extends App {

import system.dispatcher

val sourceOfLines = FileIO.fromPath(Paths.get("src/main/resources/2008.csv"))
val sourceOfLines = FileIO.fromPath(Paths.get("src/main/resources/2008_subset.csv"))
.via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 1024, allowTruncation = true)
.map(_.utf8String))

Expand All @@ -66,15 +65,15 @@ object FlightDelayStreaming extends App {
val filterAndConvert: Flow[FlightEvent, FlightDelayRecord, NotUsed] =
Flow[FlightEvent]
.filter(r => Try(r.arrDelayMins.toInt).getOrElse(-1) > 0) // convert arrival delays to ints, filter out non delays
.mapAsyncUnordered(parallelism = 2) { r =>
.mapAsyncUnordered(parallelism = 100_000) { r =>
Future(FlightDelayRecord(r.year, r.month, r.dayOfMonth, r.flightNum, r.uniqueCarrier, r.arrDelayMins))
}

// Aggregate number of delays and totalMins
val averageCarrierDelay: Flow[FlightDelayRecord, FlightDelayAggregate, NotUsed] =
Flow[FlightDelayRecord]
// maxSubstreams must be larger than the number of UniqueCarrier in the file
.groupBy(30, _.uniqueCarrier, allowClosedSubstreamRecreation = true)
.groupBy(100, _.uniqueCarrier, allowClosedSubstreamRecreation = true)
//.wireTap(each => println(s"Processing FlightDelayRecord: $each"))
.fold(FlightDelayAggregate("", 0, 0)) {
(x: FlightDelayAggregate, y: FlightDelayRecord) =>
Expand Down

0 comments on commit beec0d0

Please sign in to comment.