Skip to content

Commit

Permalink
Refactor Downloader & testing (#245)
Browse files Browse the repository at this point in the history
Split database update from Downloader, which makes the concern clearer
which in turns makes testing easier.
  • Loading branch information
lenguyenthanh authored Jan 31, 2025
2 parents 5d50d96 + a9704c9 commit 7691e26
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 94 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ lazy val crawler = (project in file("modules/crawler"))
name := "crawler",
libraryDependencies ++= Seq(
fs2Compress,
http4sClient
http4sClient,
http4sEmberClient % Test,
)
)
.dependsOn(domain, db)
Expand Down
104 changes: 12 additions & 92 deletions modules/crawler/src/main/scala/Crawler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import cats.effect.IO
import cats.syntax.all.*
import fide.db.{ Db, KVStore }
import fide.domain.*
import fide.types.*
import org.http4s.*
import org.http4s.client.Client
import org.http4s.implicits.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

Expand All @@ -21,100 +19,22 @@ object Crawler:

def instance(db: Db, store: KVStore, client: Client[IO], config: CrawlerConfig)(using Logger[IO]) =
val syncer = Syncer.instance(store, client)
val downloader = Downloader(db, client, config)
val downloader = Downloader(client)
new Crawler:

def crawl: IO[Unit] =
syncer.fetchStatus.flatMap:
case OutDated(timestamp) =>
(downloader.fetchAndSave *> timestamp.traverse_(syncer.saveLastUpdate))
(fetchAndSave *> timestamp.traverse_(syncer.saveLastUpdate))
.handleErrorWith(e => error"Error while crawling: $e")
case _ => info"Skipping crawling as the data is up to date"

trait Downloader:
def fetchAndSave: IO[Unit]

object Downloader:
val downloadUrl = uri"http://ratings.fide.com/download/players_list.zip"
lazy val request = Request[IO](
method = Method.GET,
uri = downloadUrl
)

def apply(db: Db, client: Client[IO], config: CrawlerConfig)(using Logger[IO]): Downloader = new:
def fetchAndSave: IO[Unit] =
info"Start crawling"
*> fetch
*> info"Finished crawling"

private def fetch: IO[Unit] =
client
.stream(request)
.switchMap(_.body)
.through(Decompressor.decompress)
.through(fs2.text.utf8.decode)
.through(fs2.text.lines)
.drop(1) // first line is header
.collect:
case line if line.trim.nonEmpty => line
.evalMap(parseLine)
.collect:
case Some(x) => x
.chunkN(config.chunkSize, true)
.map(_.toList)
.parEvalMapUnordered(config.concurrentUpsert)(db.upsert)
.compile
.drain

// shamelessly copied (with some minor modificaton) from: https://github.com/lichess-org/lila/blob/8033c4c5a15cf9bb2b36377c3480f3b64074a30f/modules/fide/src/main/FidePlayerSync.scala#L131
def parseLine(line: String)(using Logger[IO]): IO[Option[(NewPlayer, Option[NewFederation])]] =
parse(line).handleErrorWith(e => error"Error while parsing line: $line, error: $e".as(none))

def parse(line: String)(using Logger[IO]): IO[Option[(NewPlayer, Option[NewFederation])]] =
def string(start: Int, end: Int): Option[String] = line.substring(start, end).trim.some.filter(_.nonEmpty)

def number(start: Int, end: Int): Option[Int] = string(start, end).flatMap(_.toIntOption)
def rating(start: Int, end: Int): Option[Rating] = string(start, end) >>= Rating.fromString

def findFed(id: FederationId, playerId: PlayerId): IO[Option[NewFederation]] =
Federation
.nameById(id)
.fold(warn"cannot find federation: $id for player: $playerId" *> none[NewFederation].pure[IO])(name =>
NewFederation(id, name).some.pure[IO]
)

val x = for
id <- number(0, 15) >>= PlayerId.option
name <- string(15, 76).map(_.filterNot(_.isDigit).trim)
if name.sizeIs > 2
title = string(84, 89) >>= Title.apply
wTitle = string(89, 94) >>= Title.apply
otherTitles = string(94, 109).fold(Nil)(OtherTitle.applyToList)
sex = string(79, 82) >>= Sex.apply
year = number(152, 156).filter(_ > 1000)
inactiveFlag = string(158, 160)
federationId = string(76, 79) >>= FederationId.option
yield NewPlayer(
id = id,
name = name,
title = title,
womenTitle = wTitle,
otherTitles = otherTitles,
standard = rating(113, 117),
rapid = rating(126, 132),
blitz = rating(139, 145),
sex = sex,
birthYear = year,
active = inactiveFlag.isEmpty
) -> federationId

x.traverse:
case (player, Some(fedId)) => findFed(fedId, player.id).map(fed => (player, fed))
case (player, None) => (player, none).pure[IO]

object Decompressor:

import de.lhns.fs2.compress.*
val defaultChunkSize = 1024 * 4

def decompress: fs2.Pipe[IO, Byte, Byte] =
_.through(ArchiveSingleFileDecompressor(Zip4JUnarchiver.make[IO](defaultChunkSize)).decompress)
def fetchAndSave: IO[Unit] =
info"Start crawling"
*> downloader.fetch
.chunkN(config.chunkSize)
.map(_.toList)
.parEvalMapUnordered(config.concurrentUpsert)(db.upsert)
.compile
.drain
*> info"Finished crawling"
92 changes: 92 additions & 0 deletions modules/crawler/src/main/scala/Downloader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package fide.crawler

import cats.effect.IO
import cats.syntax.all.*
import fide.domain.*
import fide.types.*
import org.http4s.*
import org.http4s.client.Client
import org.http4s.implicits.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

type PlayerInfo = (NewPlayer, Option[NewFederation])
trait Downloader:
def fetch: fs2.Stream[IO, PlayerInfo]

object Downloader:
val downloadUrl = uri"http://ratings.fide.com/download/players_list.zip"

lazy val request = Request[IO](
method = Method.GET,
uri = downloadUrl
)

def apply(client: Client[IO])(using Logger[IO]): Downloader = new:

def fetch =
client
.stream(request)
.switchMap(_.body)
.through(Decompressor.decompress)
.through(fs2.text.utf8.decode)
.through(fs2.text.lines)
.drop(1) // first line is header
.evalMap(parseLine)
.collect { case Some(x) => x }

def parseLine(line: String)(using Logger[IO]): IO[Option[(NewPlayer, Option[NewFederation])]] =
IO(line.trim.nonEmpty)
.ifM(parse(line), none.pure[IO])
.handleErrorWith(e => error"Error while parsing line: $line, error: $e".as(none))

// shamelessly copied (with some minor modificaton) from: https://github.com/lichess-org/lila/blob/8033c4c5a15cf9bb2b36377c3480f3b64074a30f/modules/fide/src/main/FidePlayerSync.scala#L131
def parse(line: String)(using Logger[IO]): IO[Option[(NewPlayer, Option[NewFederation])]] =
def string(start: Int, end: Int): Option[String] = line.substring(start, end).trim.some.filter(_.nonEmpty)

def number(start: Int, end: Int): Option[Int] = string(start, end).flatMap(_.toIntOption)
def rating(start: Int, end: Int): Option[Rating] = string(start, end) >>= Rating.fromString

def findFed(id: FederationId, playerId: PlayerId): IO[Option[NewFederation]] =
Federation
.nameById(id)
.fold(warn"cannot find federation: $id for player: $playerId" *> none[NewFederation].pure[IO])(name =>
NewFederation(id, name).some.pure[IO]
)

val x = for
id <- number(0, 15) >>= PlayerId.option
name <- string(15, 76).map(_.filterNot(_.isDigit).trim)
if name.sizeIs > 2
title = string(84, 89) >>= Title.apply
wTitle = string(89, 94) >>= Title.apply
otherTitles = string(94, 109).fold(Nil)(OtherTitle.applyToList)
sex = string(79, 82) >>= Sex.apply
year = number(152, 156).filter(_ > 1000)
inactiveFlag = string(158, 160)
federationId = string(76, 79) >>= FederationId.option
yield NewPlayer(
id = id,
name = name,
title = title,
womenTitle = wTitle,
otherTitles = otherTitles,
standard = rating(113, 117),
rapid = rating(126, 132),
blitz = rating(139, 145),
sex = sex,
birthYear = year,
active = inactiveFlag.isEmpty
) -> federationId

x.traverse:
case (player, Some(fedId)) => findFed(fedId, player.id).map(fed => (player, fed))
case (player, None) => (player, none).pure[IO]

object Decompressor:

import de.lhns.fs2.compress.*
val defaultChunkSize = 1024 * 4

def decompress: fs2.Pipe[IO, Byte, Byte] =
_.through(ArchiveSingleFileDecompressor(Zip4JUnarchiver.make[IO](defaultChunkSize)).decompress)
27 changes: 27 additions & 0 deletions modules/crawler/src/test/scala/DownloaderTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package fide
package crawler
package test

import cats.effect.IO
import fide.domain.Federation
import fide.types.FederationId
import org.http4s.ember.client.EmberClientBuilder
import org.typelevel.log4cats.Logger
import weaver.*

object DownloaderTest extends SimpleIOSuite:

given Logger[IO] = org.typelevel.log4cats.noop.NoOpLogger[IO]

test("No new federations after downloads"):
EmberClientBuilder
.default[IO]
.build
.use: client =>
Downloader(client).fetch
.map(x => x._2)
.fold[Set[FederationId]](Set.empty)((acc, x) => acc ++ x.map(_.id))
.compile
.last
.map(x => Federation.names.keySet.diff(x.get))
.map(x => expect(x == Set.empty))
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import fide.domain.OtherTitle
import org.typelevel.log4cats.Logger
import weaver.*

object CrawlerTest extends SimpleIOSuite:
object ParserTest extends SimpleIOSuite:

given Logger[IO] = org.typelevel.log4cats.noop.NoOpLogger[IO]

Expand All @@ -22,6 +22,7 @@ object CrawlerTest extends SimpleIOSuite:
val lines = List(
"8605360 A La, Teng Hua CHN F 1949 0 40 1993 wi ",
"25021044 Aagney L., Narasimhan IND M 1606 0 20 1565 0 20 1567 0 20 2000 i ",
" ", // whitespaces only line
"29976634 Aafrin, S F Aja SRI F 2012 w ",
"1478800 Aagaard, Christian DEN M 1999 "
)
Expand Down

0 comments on commit 7691e26

Please sign in to comment.