Skip to content

Commit

Permalink
bump: Akka Persistence JDBC 5.4.0, Slick 3.5.0, all modules in Scala 3 (
Browse files Browse the repository at this point in the history
#1146)

* Exclude transitive slf4j-api 2.2 and pin to 1.7
  • Loading branch information
johanandren authored Mar 27, 2024
1 parent 2bff21b commit 63c37b3
Show file tree
Hide file tree
Showing 25 changed files with 80 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
jvm: temurin:1.11

- name: Compile all code with fatal warnings for Java 11, Scala 2.13 and Scala 3
run: sbt "clean ; +compile; Test/compile; akka-projection-integration/Test/compile"
run: sbt "clean ; +compile; +Test/compile; +akka-projection-integration/Test/compile"

check-docs:
name: Check Docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class EventProducerPushSpec(testContainerConf: TestContainerConf)
sourceProvider =
EventSourcedProvider.eventsBySlices[String](system, "test.consumer.r2dbc.query", entityType, 0, 1023),
handler = () => {
envelope: EventEnvelope[String] =>
(envelope: EventEnvelope[String]) =>
probe ! envelope
Future.successful(Done)
})))
Expand Down Expand Up @@ -159,7 +159,7 @@ class EventProducerPushSpec(testContainerConf: TestContainerConf)
}
.withConsumerFilters(Vector(ConsumerFilter.ExcludeEntityIds(Set(consumerFilterExcludedPid.id))))
// #consumerSetup
val bound = Http(system)
val bound = Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(EventProducerPushDestination.grpcServiceHandler(destination))
// #consumerSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ProducerFilterEndToEndSpec(testContainerConf: TestContainerConf)
val handler = EventProducer.grpcServiceHandler(eps)

val bound =
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(ServiceHandler.concatOrNotFound(handler))
.map(_.addToCoordinatedShutdown(3.seconds))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ object ProducerPushSampleConsumer {
settings = None,
sourceProvider = consumerProjectionProvider,
handler = () => {
envelope: EventEnvelope[String] =>
(envelope: EventEnvelope[String]) =>
log.infoN(
"Saw projected event: {}-{}: {}",
envelope.persistenceId,
Expand All @@ -179,7 +179,7 @@ object ProducerPushSampleConsumer {
streamId,
// note: we use akka serialization for the payloads here, so no proto descriptors
Nil)
val bound = Http(system)
val bound = Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(EventProducerPushDestination.grpcServiceHandler(destination))
bound.foreach(binding =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TopicProducerFilterEndToEndSpec(testContainerConf: TestContainerConf)
val handler = EventProducer.grpcServiceHandler(eps)

val bound =
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(ServiceHandler.concatOrNotFound(handler))
.map(_.addToCoordinatedShutdown(3.seconds))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand All @@ -253,32 +253,32 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
// replicate to B
entityRefA.ask(TestEntity.SetScope(Set(DCB.id), _)).futureValue
eventually {
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1), Set(DCB.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1), Set(DCB.id))
}
entityRefC.ask(TestEntity.Get).futureValue shouldBe TestEntity.State.initial
entityRefC.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State.initial

// update in B
entityRefB.ask(TestEntity.UpdateItem("B", 2, _)).futureValue
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCB.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCB.id))
logger.info("Replicate from B to A")
// replicate to A
entityRefB.ask(TestEntity.SetScope(Set(DCA.id), _)).futureValue
eventually {
entityRefA.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefA.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
}

// replicate to C
entityRefA.ask(TestEntity.SetScope(Set(DCC.id), _)).futureValue
eventually {
entityRefC.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id))
entityRefC.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id))
}
// update in C
entityRefC.ask(TestEntity.UpdateItem("C", 3, _)).futureValue

// replicate to A
entityRefC.ask(TestEntity.SetScope(Set(DCA.id), _)).futureValue
eventually {
entityRefA.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(
entityRefA.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(
Map("A" -> 1, "B" -> 2, "C" -> 3),
Set(DCA.id))
}
Expand All @@ -299,17 +299,17 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
logger.info("Replicate from A to B")
entityRefA.ask(TestEntity.SetScope(Set(DCB.id), _)).futureValue
eventually {
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1), Set(DCB.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1), Set(DCB.id))
}

// update in B
entityRefB.ask(TestEntity.UpdateItem("B", 2, _)).futureValue
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCB.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCB.id))
logger.info("Replicate from B to A")
// replicate to A
entityRefB.ask(TestEntity.SetScope(Set(DCA.id), _)).futureValue
eventually {
entityRefA.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefA.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
}

// replicate to C
Expand All @@ -320,28 +320,28 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
logger.info("Replicate from A to C")
entityRefA.ask(TestEntity.SetScope(Set(DCC.id), _)).futureValue
eventually {
entityRefC.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id))
entityRefC.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id))
}

// update in C
entityRefC.ask(TestEntity.UpdateItem("C", 3, _)).futureValue
// latest should not be replicated to B
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
Thread.sleep(r2dbcSettings.querySettings.backtrackingBehindCurrentTime.toMillis + 200)
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))

// another update in C
entityRefC.ask(TestEntity.UpdateItem("C", 4, _)).futureValue
// latest should still not be replicated to B
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
Thread.sleep(r2dbcSettings.querySettings.backtrackingBehindCurrentTime.toMillis + 200)
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))

// replicate to B
logger.info("Replicate from C to B")
entityRefC.ask(TestEntity.SetScope(Set(DCB.id), _)).futureValue
eventually {
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(
entityRefB.ask(TestEntity.Get.apply).futureValue shouldBe TestEntity.State(
Map("A" -> 1, "B" -> 2, "C" -> 4),
Set(DCB.id))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class EdgeReplicationMultiIntegrationSpec(testContainerConf: TestContainerConf)
val combinedHandler = consumeHandler.orElse(produceHandler)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(combinedHandler)
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class PushReplicationIntegrationSpec(testContainerConf: TestContainerConf)
Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,12 @@ class ReplicationJavaDSLIntegrationSpec(testContainerConf: TestContainerConf)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
Http
.get(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.toScala
.map { binding: ServerBinding =>
.map { (binding: ServerBinding) =>
binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system)
replica.replicaId -> started
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ProducerApiSample {
// RES push to combine with multiple other services
val _ = EventProducerPushDestination.grpcServiceHandler(replication.eventProducerPushDestination.toSet)

val _ = Http(system).newServerAt(host, port).bind(handler)
val _ = Http().newServerAt(host, port).bind(handler)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory
* INTERNAL API
*/
@InternalApi
private[projection] class JdbcOffsetStore[S <: JdbcSession](
class JdbcOffsetStore[S <: JdbcSession](
system: ActorSystem[_],
settings: JdbcSettings,
jdbcSessionFactory: () => S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.typesafe.config.ConfigValueType
* INTERNAL API
*/
@InternalApi
private[projection] case class JdbcSettings(config: Config, executionContext: ExecutionContext) {
case class JdbcSettings(config: Config, executionContext: ExecutionContext) {

val schema: Option[String] =
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class KafkaToSlickIntegrationSpec extends KafkaSpecBase(ConfigFactory.load().wit
PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Milliseconds))

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db, dbConfig.profile, SlickSettings(system.toTyped))
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig, SlickSettings(system.toTyped))
val repository = new EventTypeCountRepository(dbConfig)

override protected def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ abstract class SlickOffsetStoreSpec(specConfig: SlickSpecConfig)
private val clock = new TestClock

private val offsetStore =
new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(slickConfig), clock)
new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock)

override protected def beforeAll(): Unit = {
// create offset table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class SlickProjectionSpec

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)

val offsetStore = new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(system))
val offsetStore = new SlickOffsetStore(system, dbConfig, SlickSettings(system))

val projectionTestKit = ProjectionTestKit(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ object SlickProjection {
@nowarn("msg=never used")
private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig: DatabaseConfig[P])(
implicit system: ActorSystem[_]) =
new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile, SlickSettings(system))
new SlickOffsetStore(system, databaseConfig, SlickSettings(system))
}

object SlickHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
package akka.projection.slick.internal

import java.time.Clock

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand All @@ -25,23 +23,26 @@ import akka.projection.jdbc.internal.MySQLDialect
import akka.projection.jdbc.internal.OracleDialect
import akka.projection.jdbc.internal.PostgresDialect
import akka.util.Helpers.toRootLowerCase
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile

/**
* INTERNAL API
*/
@InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile](
system: ActorSystem[_],
val db: P#Backend#Database,
val profile: P,
databaseConfig: DatabaseConfig[P],
slickSettings: SlickSettings,
clock: Clock) {
import OffsetSerialization.MultipleOffsets
import OffsetSerialization.SingleOffset
import profile.api._

def this(system: ActorSystem[_], db: P#Backend#Database, profile: P, slickSettings: SlickSettings) =
this(system, db, profile, slickSettings, Clock.systemUTC())
def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) =
this(system, databaseConfig, slickSettings, Clock.systemUTC())

private[akka] val profile: P = databaseConfig.profile
private val db: databaseConfig.profile.Backend#Database = databaseConfig.db
import profile.api._

val (dialect, useLowerCase): (Dialect, Boolean) = {

Expand Down Expand Up @@ -114,7 +115,7 @@ import slick.jdbc.JdbcProfile
if (useLowerCase) toRootLowerCase(str)
else str

class OffsetStoreTable(tag: Tag) extends Table[OffsetRow](tag, dialect.schema, dialect.tableName) {
private[akka] class OffsetStoreTable(tag: Tag) extends Table[OffsetRow](tag, dialect.schema, dialect.tableName) {

def projectionName = column[String](adaptCase("PROJECTION_NAME"), O.Length(255))
def projectionKey = column[String](adaptCase("PROJECTION_KEY"), O.Length(255))
Expand All @@ -137,11 +138,12 @@ import slick.jdbc.JdbcProfile
mergeable: Boolean,
lastUpdated: Long)

val offsetTable = TableQuery[OffsetStoreTable]
private[akka] val offsetTable = TableQuery[OffsetStoreTable]

case class ManagementStateRow(projectionName: String, projectionKey: String, paused: Boolean, lastUpdated: Long)

class ManagementTable(tag: Tag) extends Table[ManagementStateRow](tag, dialect.schema, dialect.managementTableName) {
private[akka] class ManagementTable(tag: Tag)
extends Table[ManagementStateRow](tag, dialect.schema, dialect.managementTableName) {

def projectionName = column[String](adaptCase("PROJECTION_NAME"), O.Length(255))
def projectionKey = column[String](adaptCase("PROJECTION_KEY"), O.Length(255))
Expand All @@ -153,7 +155,7 @@ import slick.jdbc.JdbcProfile
def * = (projectionName, projectionKey, paused, lastUpdated).mapTo[ManagementStateRow]
}

val managementTable = TableQuery[ManagementTable]
private[akka] val managementTable = TableQuery[ManagementTable]

def createIfNotExists(): Future[Done] = {
val prepareSchemaDBIO = SimpleDBIO[Unit] { jdbcContext =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile
settings) {

implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)
override val logger: LoggingAdapter = Logging(system.classicSystem, classOf[SlickInternalProjectionState])

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
Expand Down
Loading

0 comments on commit 63c37b3

Please sign in to comment.