Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new migration strategy for Pekko Persistence snapshots #1423

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#####################################
# Pekko Actor Reference Config File #
########################3############
#####################################

# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
Expand Down
9 changes: 7 additions & 2 deletions persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# SPDX-License-Identifier: Apache-2.0

###########################################################
############################################################
# Pekko Persistence Extension Reference Configuration File #
###########################################################
############################################################

# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
Expand Down Expand Up @@ -42,6 +42,11 @@ pekko.persistence {
plugin = ""
# List of snapshot stores to start automatically. Use "" for the default snapshot store.
auto-start-snapshot-stores = []
# When migrating from using Akka Persistence to using Pekko Persistence,
# you may need to have the serializer handle Akka or Pekko created snapshots.
# Supported values are "pekko", "akka" and "no-migration".
# See https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility
auto-migrate-manifest = "pekko"
}
# used as default-snapshot store if no plugin configured
# (see `pekko.persistence.snapshot-store`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,39 @@ import pekko.util.ByteString.UTF_8
@SerialVersionUID(1L)
final case class Snapshot(data: Any)

private[serialization] sealed trait SnapshotAutoMigration

private[serialization] object SnapshotAutoMigration {
val ConfigName = "pekko.persistence.snapshot-store.auto-migrate-manifest"

// Ignore the snapshot migration strategy - means that Pekko will not be able to work with snapshots saved by Akka
object NoMigration extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `akka` to `org.apache.pekko`
object Pekko extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `org.apache.pekko` to `akka`
object Akka extends SnapshotAutoMigration

def fromString(s: String): SnapshotAutoMigration = s match {
case "no-migration" => NoMigration
case "pekko" => Pekko
case "akka" => Akka
case _ => throw new IllegalArgumentException(s"Unknown snapshot migration strategy: $s")
}
}

/**
* [[Snapshot]] serializer.
*/
class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import SnapshotAutoMigration._

override val includeManifest: Boolean = false

private lazy val serialization = SerializationExtension(system)

private lazy val migrationStrategy = SnapshotAutoMigration.fromString(
system.settings.config.getString(ConfigName))

/**
* Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching
* `org.apache.pekko.serialization.Serializer`.
Expand All @@ -58,7 +82,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val out = new ByteArrayOutputStream
writeInt(out, snapshotSerializer.identifier)

val ms = Serializers.manifestFor(snapshotSerializer, snapshot)
val ms = migrateManifestIfNecessary(Serializers.manifestFor(snapshotSerializer, snapshot))
if (ms.nonEmpty) out.write(ms.getBytes(UTF_8))

out.toByteArray
Expand All @@ -77,11 +101,44 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else {
val manifestBytes = new Array[Byte](remaining)
in.read(manifestBytes)
new String(manifestBytes, UTF_8)
migrateManifestToPekkoIfNecessary(new String(manifestBytes, UTF_8))
}
(serializerId, manifest)
}

// when writing the data, we want to allow the serialized data to
// support Akka and Pekko serializers as required by configuration
private def migrateManifestIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case Pekko =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
case Akka =>
if (manifest.startsWith("org.apache.pekko")) {
manifest.replaceFirst("org.apache.pekko", "akka")
} else {
manifest
}
}
}

// when reading the data, we want to force use of the Pekko serializer
private def migrateManifestToPekkoIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case _ =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
}
}

private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
def serialize() = {
val snapshotSerializer = serialization.findSerializerFor(snapshot)
Expand Down Expand Up @@ -112,14 +169,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer

val (serializerId, manifest) = headerFromBinary(headerBytes)

// suggested in https://github.com/scullxbones/pekko-persistence-mongo/pull/14#issuecomment-1847223850
serialization
.deserialize(snapshotBytes, serializerId, manifest)
.recoverWith {
case _: NotSerializableException if manifest.startsWith("akka") =>
serialization
.deserialize(snapshotBytes, serializerId, manifest.replaceFirst("akka", "org.apache.pekko"))
}
.get
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.serialization

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.io.NotSerializableException
import java.util.Base64

class SnapshotSerializerMigrationAkkaSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=akka"
) {

import SnapshotSerializerTestData._

"Snapshot serializer with migration to Akka" should {
"deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"serialize snapshot with Akka class name" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val cfg = ConfigFactory.parseString(s"${SnapshotAutoMigration.ConfigName}=no-migration")
.withFallback(system.settings.config)
val pekkoOnlySystem = ActorSystem("pekko-only-serialization", cfg)
try {
val pekkoOnlySerialization = SerializationExtension(pekkoOnlySystem)
intercept[NotSerializableException] {
pekkoOnlySerialization.deserialize(bytes, classOf[Snapshot]).get
}
} finally {
pekkoOnlySystem.terminate()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.serialization

import org.apache.pekko
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.io.NotSerializableException
import java.util.Base64

class SnapshotSerializerNoMigrationSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=no-migration"
) {

import SnapshotSerializerTestData._

"Snapshot serializer with no migration" should {
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"fail to deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
intercept[NotSerializableException] {
serialization.deserialize(bytes, classOf[Snapshot]).get
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,53 @@
package org.apache.pekko.persistence.serialization

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.util.Base64

private[serialization] object SnapshotSerializerTestData {
val fsmSnapshot = PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val akkaSnapshotData =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
}

class SnapshotSerializerSpec extends PekkoSpec {

import SnapshotSerializerTestData._

"Snapshot serializer" should {
"deserialize akka snapshots" in {
val system = ActorSystem()
val serialization = SerializationExtension(system)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val data =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(data)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pre-saved pekko snapshots" in {
val serialization = SerializationExtension(system)
// this is Pekko encoded snapshot based on https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val pekkoSnapshotData =
"SAAAAAcAAABvcmcuYXBhY2hlLnBla2tvLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(pekkoSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
persistentFSMSnapshot shouldEqual fsmSnapshot
}
}
}