From 1ec65f5c7ff1bb96bb4459d2ac3e3cbb6a1566aa Mon Sep 17 00:00:00 2001 From: Ruslans Tarasovs Date: Thu, 1 Feb 2024 18:02:31 +0200 Subject: [PATCH] Removed unnecessary copy-paste. --- .../cassandra/SnapshotCassandraTest.scala | 75 +++++++++++-------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala index 38e4915c1..e568bfb9d 100644 --- a/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala +++ b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala @@ -38,7 +38,7 @@ class SnapshotCassandraTest extends AnyFunSuite { statements.copy(selectMetadata = { _ => // sync data after first call to simulate delayed update // otherwise the `selectMetadata` call may be stuck in an infinite loop - DatabaseState.get.map(_.metadata) <* DatabaseState.sync + DatabaseState.metadata <* DatabaseState.sync }), numberOfSnapshots ) @@ -67,10 +67,10 @@ class SnapshotCassandraTest extends AnyFunSuite { _ <- DatabaseState.sync _ <- store.save(key, record) _ <- DatabaseState.sync - state <- DatabaseState.get + size <- DatabaseState.size } yield { // we should only get one snapshot in a database - assert(state.stored.size == 1) + assert(size == 1) } program.run(DatabaseState.empty).get } @@ -117,34 +117,19 @@ class SnapshotCassandraTest extends AnyFunSuite { def statements: SnapshotCassandra.Statements[F] = SnapshotCassandra.Statements( insertRecord = { (_, bufferNr, snapshot) => - for { - state0 <- DatabaseState.get - state1 = state0.insert(bufferNr, snapshot) - wasApplied = state1.isDefined - _ <- DatabaseState.set(state1.getOrElse(state0)) - } yield wasApplied + DatabaseState.insert(bufferNr, snapshot) }, updateRecord = { (_, bufferNr, insertSnapshot, deleteSnapshot) => - for { - state0 <- DatabaseState.get - state1 = state0.update(bufferNr, insertSnapshot, deleteSnapshot) - wasApplied = state1.isDefined - _ <- DatabaseState.set(state1.getOrElse(state0)) - } yield wasApplied + DatabaseState.update(bufferNr, insertSnapshot, deleteSnapshot) }, selectRecords = { (_, bufferNr) => - DatabaseState.get.map(_.select(bufferNr)) + DatabaseState.select(bufferNr) }, selectMetadata = { _ => - DatabaseState.get.map(_.metadata) + DatabaseState.metadata }, deleteRecords = { (_, bufferNr) => - for { - state0 <- DatabaseState.get - state1 = state0.delete(bufferNr) - wasApplied = state1.isDefined - _ <- DatabaseState.set(state1.getOrElse(state0)) - } yield wasApplied + DatabaseState.delete(bufferNr) } ) @@ -165,6 +150,11 @@ class SnapshotCassandraTest extends AnyFunSuite { } } + def delete(bufferNr: BufferNr): Option[DatabaseState] = + Option.when(stored.contains(bufferNr)) { + this.copy(stored = stored - bufferNr) + } + def select(bufferNr: BufferNr): Option[SnaphsotWithPayload] = availableForReading.get(bufferNr) @@ -173,10 +163,7 @@ class SnapshotCassandraTest extends AnyFunSuite { (snapshot.snapshot.seqNr, snapshot.timestamp) } - def delete(bufferNr: BufferNr): Option[DatabaseState] = - Option.when(stored.contains(bufferNr)) { - this.copy(stored = stored - bufferNr) - } + def size: Int = stored.size def sync: DatabaseState = this.copy(availableForReading = stored) @@ -187,18 +174,40 @@ class SnapshotCassandraTest extends AnyFunSuite { def empty: DatabaseState = DatabaseState(stored = Map.empty, availableForReading = Map.empty) - def set(state: DatabaseState): F[Unit] = - StateT.set(state) + def insert(bufferNr: BufferNr, snapshot: SnaphsotWithPayload): F[Boolean] = + DatabaseState.tryModify(_.insert(bufferNr, snapshot)) - def get: F[DatabaseState] = - StateT.get + def update(bufferNr: BufferNr, insertSnapshot: SnaphsotWithPayload, deleteSnapshot: SeqNr): F[Boolean] = + DatabaseState.tryModify(_.update(bufferNr, insertSnapshot, deleteSnapshot)) + + def delete(bufferNr: BufferNr): F[Boolean] = + DatabaseState.tryModify(_.delete(bufferNr)) + + def select(bufferNr: BufferNr): F[Option[SnaphsotWithPayload]] = + DatabaseState.get.map(_.select(bufferNr)) + + def metadata: F[Map[BufferNr, (SeqNr, Instant)]] = + DatabaseState.get.map(_.metadata) - def modify(f: DatabaseState => DatabaseState): F[Unit] = - StateT.modify(f) + def size: F[Int] = DatabaseState.get.map(_.size) def sync: F[Unit] = StateT.modify(_.sync) + private def set(state: DatabaseState): F[Unit] = + StateT.set(state) + + private def get: F[DatabaseState] = + StateT.get + + private def tryModify(f: DatabaseState => Option[DatabaseState]): F[Boolean] = + for { + state0 <- DatabaseState.get + state1 = f(state0) + _ <- state1.traverse(DatabaseState.set) + wasApplied = state1.isDefined + } yield wasApplied + } val record = SnapshotRecord(