Skip to content

Commit

Permalink
chore: bump to akka 2.10.0-M1 (#1217)
Browse files Browse the repository at this point in the history
* chore: bump to akka 2.10.0-M1
  • Loading branch information
sebastian-alfers authored Oct 4, 2024
1 parent 929524e commit 9bc4c6e
Show file tree
Hide file tree
Showing 102 changed files with 545 additions and 565 deletions.
2 changes: 1 addition & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ updates.ignore = [
{ groupId = "com.typesafe.slick" }

{groupId = "com.fasterxml.jackson.core" }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.2." }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.5." }
]

updatePullRequests = false
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import akka.stream.javadsl.Source;
import org.junit.*;
import org.scalatestplus.junit.JUnitSuite;
import scala.compat.java8.FutureConverters;

import scala.concurrent.Await;
import scala.jdk.javaapi.FutureConverters;

import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -69,13 +70,13 @@ public static void beforeAll() throws Exception {
// we should keep trying to create the table until it succeeds
CompletionStage<Done> createTableAttempts =
Patterns.retry(
() -> FutureConverters.toJava(offsetStore.createKeyspaceAndTable()),
() -> FutureConverters.asJava(offsetStore.createKeyspaceAndTable()),
20,
Duration.ofSeconds(3),
testKit.system().classicSystem().scheduler(),
testKit.system().executionContext());
Await.result(
FutureConverters.toScala(createTableAttempts),
FutureConverters.asScala(createTableAttempts),
scala.concurrent.duration.Duration.create(60, TimeUnit.SECONDS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package akka.projection.cassandra

import java.time.Instant
import java.util.UUID
import scala.compat.java8.FutureConverters._

import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.jdk.FutureConverters._
import scala.util.Try

import akka.Done
import akka.actor.Scheduler
import akka.actor.testkit.typed.scaladsl.LogCapturing
Expand Down Expand Up @@ -50,9 +52,9 @@ class CassandraOffsetStoreSpec
s <- session.underlying()

// reason for setSchemaMetadataEnabled is that it speed up tests
_ <- s.setSchemaMetadataEnabled(false).toScala
_ <- s.setSchemaMetadataEnabled(false).asScala
_ <- offsetStore.createKeyspaceAndTable()
_ <- s.setSchemaMetadataEnabled(null).toScala
_ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done

// the container can takes time to be 'ready',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ package akka.projection.cassandra
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.jdk.FutureConverters._

import akka.Done
import akka.NotUsed
import akka.actor.Scheduler
Expand Down Expand Up @@ -152,10 +154,10 @@ class CassandraProjectionSpec
for {
s <- session.underlying()
// reason for setSchemaMetadataEnabled is that it speed up tests
_ <- s.setSchemaMetadataEnabled(false).toScala
_ <- s.setSchemaMetadataEnabled(false).asScala
_ <- offsetStore.createKeyspaceAndTable()
_ <- repository.createKeyspaceAndTable()
_ <- s.setSchemaMetadataEnabled(null).toScala
_ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done

// the container can takes time to be 'ready',
Expand All @@ -168,10 +170,10 @@ class CassandraProjectionSpec
Await.ready(for {
s <- session.underlying()
// reason for setSchemaMetadataEnabled is that it speed up tests
_ <- s.setSchemaMetadataEnabled(false).toScala
_ <- s.setSchemaMetadataEnabled(false).asScala
_ <- session.executeDDL(s"DROP keyspace ${offsetStore.keyspace}")
_ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}")
_ <- s.setSchemaMetadataEnabled(null).toScala
_ <- s.setSchemaMetadataEnabled(null).asScala
} yield Done, 30.seconds)
super.afterAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ object CassandraProjection {
* before the system is started.
*/
def createTablesIfNotExists(system: ActorSystem[_]): CompletionStage[Done] = {
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
val offsetStore = new CassandraOffsetStore(system)
offsetStore.createKeyspaceAndTable().toJava
offsetStore.createKeyspaceAndTable().asJava
}

@deprecated("Renamed to createTablesIfNotExists", "1.2.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ package akka.projection.internal
import java.nio.charset.StandardCharsets
import java.util.Base64
import java.util.UUID

import scala.collection.immutable

import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.query
import akka.projection.MergeableOffset
import akka.projection.ProjectionId
import akka.serialization.SerializerWithStringManifest
import akka.util.unused
import org.scalatest.wordspec.AnyWordSpecLike

import scala.annotation.unused
import scala.collection.immutable

object OffsetSerializationSpec {
class TestSerializer(@unused system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 9999
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package akka.projection

import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._

import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

/**
* Error handling strategy when processing an `Envelope` fails. The default is defined in configuration .
Expand Down Expand Up @@ -45,7 +45,7 @@ object HandlerRecoveryStrategy {
* and fail the stream if all attempts fail.
*/
def retryAndFail(retries: Int, delay: java.time.Duration): HandlerRecoveryStrategy =
retryAndFail(retries, delay.asScala)
retryAndFail(retries, delay.toScala)

/**
* Scala API: If the first attempt to invoke the handler fails it will retry invoking the handler with the
Expand All @@ -61,7 +61,7 @@ object HandlerRecoveryStrategy {
* discard the element and continue with next if all attempts fail.
*/
def retryAndSkip(retries: Int, delay: java.time.Duration): HandlerRecoveryStrategy =
retryAndSkip(retries, delay.asScala)
retryAndSkip(retries, delay.toScala)

/**
* INTERNAL API: placed here instead of the `internal` package because of sealed trait
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/

package akka.projection
import akka.actor.typed.scaladsl.LoggerOps
import scala.util.Failure
import scala.util.Success

Expand Down Expand Up @@ -78,7 +77,7 @@ object ProjectionBehavior {
projection.actorHandlerInit[Any].foreach { init =>
val ref = ctx.spawnAnonymous(Behaviors.supervise(init.behavior).onFailure(SupervisorStrategy.restart))
init.setActor(ref)
ctx.log.debug2("Started actor handler [{}] for projection [{}]", ref, projection.projectionId)
ctx.log.debug("Started actor handler [{}] for projection [{}]", ref, projection.projectionId)
}
val running = projection.run()(ctx.system)
if (running.isInstanceOf[RunningProjectionManagement[_]])
Expand Down Expand Up @@ -133,7 +132,7 @@ object ProjectionBehavior {
running match {
case mgmt: RunningProjectionManagement[Offset] @unchecked =>
if (setOffset.projectionId == projectionId) {
context.log.info2(
context.log.info(
"Offset will be changed to [{}] for projection [{}]. The Projection will be restarted.",
setOffset.offset,
projectionId)
Expand All @@ -146,7 +145,7 @@ object ProjectionBehavior {
}

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed with: {}", op, exc)
context.log.warn("Operation [{}] failed with: {}", op, exc)
Behaviors.same

case isPaused: IsPaused =>
Expand All @@ -170,7 +169,7 @@ object ProjectionBehavior {
running match {
case mgmt: RunningProjectionManagement[_] =>
if (setPaused.projectionId == projectionId) {
context.log.info2(
context.log.info(
"Running state will be changed to [{}] for projection [{}].",
if (setPaused.paused) "paused" else "resumed",
projectionId)
Expand Down Expand Up @@ -207,7 +206,7 @@ object ProjectionBehavior {
Behaviors.same

case SetOffsetResult(replyTo) =>
context.log.info2(
context.log.info(
"Starting projection [{}] after setting offset to [{}]",
projection.projectionId,
setOffset.offset)
Expand All @@ -216,7 +215,7 @@ object ProjectionBehavior {
stashBuffer.unstashAll(started(running))

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
context.log.warn("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
Expand All @@ -233,7 +232,7 @@ object ProjectionBehavior {
Behaviors.stopped

case other =>
context.log.debug2("Projection [{}] is being stopped. Discarding [{}].", projectionId, other)
context.log.debug("Projection [{}] is being stopped. Discarding [{}].", projectionId, other)
Behaviors.unhandled
}

Expand All @@ -255,7 +254,7 @@ object ProjectionBehavior {
Behaviors.same

case SetPausedResult(replyTo) =>
context.log.info2(
context.log.info(
"Starting projection [{}] in {} mode.",
projection.projectionId,
if (setPaused.paused) "paused" else "resumed")
Expand All @@ -264,7 +263,7 @@ object ProjectionBehavior {
stashBuffer.unstashAll(started(running))

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
context.log.warn("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.projection

import scala.collection.immutable

import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._

object ProjectionId {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
package akka.projection.internal

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

import akka.Done
import akka.annotation.InternalApi
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.util.ccompat.JavaConverters._

/**
* INTERNAL API
Expand All @@ -33,14 +33,14 @@ import akka.util.ccompat.JavaConverters._
extends scaladsl.Handler[Envelope] {

override def process(envelope: Envelope): Future[Done] = {
delegate.process(envelope).toScala
delegate.process(envelope).asScala
}

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}

Expand All @@ -52,14 +52,14 @@ import akka.util.ccompat.JavaConverters._
extends scaladsl.Handler[immutable.Seq[Envelope]] {

override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
delegate.process(envelopes.asJava).toScala
delegate.process(envelopes.asJava).asScala
}

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}

Expand All @@ -76,14 +76,14 @@ private[projection] class HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
* is restarted after a failure.
*/
override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

/**
* Invoked when the projection has been stopped. Can be overridden to implement resource
* cleanup. It is also called when the `Projection` is restarted after a failure.
*/
override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala
}

/**
Expand All @@ -96,12 +96,12 @@ private[projection] class HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
override private[projection] def behavior = delegate.behavior

override final def process(envelope: Envelope): Future[Done] =
delegate.process(getActor(), envelope).toScala
delegate.process(getActor(), envelope).asScala

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}
Loading

0 comments on commit 9bc4c6e

Please sign in to comment.