-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce "snapshot-cassandra" module.
- Loading branch information
Showing
18 changed files
with
366 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
...ain/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchema.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package com.evolutiongaming.kafka.journal.snapshot.cassandra | ||
|
||
import cats.Monad | ||
import cats.data.{NonEmptyList => Nel} | ||
import cats.effect.Concurrent | ||
import cats.syntax.all._ | ||
import com.evolutiongaming.catshelper.LogOf | ||
import com.evolutiongaming.kafka.journal.eventual.cassandra._ | ||
import com.evolutiongaming.scassandra.TableName | ||
|
||
object CreateSnapshotSchema { | ||
|
||
def apply[F[_] : Concurrent : CassandraCluster : CassandraSession : CassandraSync : LogOf]( | ||
config: SnapshotSchemaConfig | ||
): F[(SnapshotSchema, MigrateSchema.Fresh)] = { | ||
|
||
for { | ||
createTables <- CreateTables.of[F] | ||
createKeyspace = CreateKeyspace[F] | ||
result <- apply[F](config, createKeyspace, createTables) | ||
} yield result | ||
} | ||
|
||
def apply[F[_] : Monad]( | ||
config: SnapshotSchemaConfig, | ||
createKeyspace: CreateKeyspace[F], | ||
createTables: CreateTables[F] | ||
): F[(SnapshotSchema, MigrateSchema.Fresh)] = { | ||
|
||
def createTables1 = { | ||
val keyspace = config.keyspace.name | ||
|
||
def tableName(table: CreateTables.Table) = TableName(keyspace = keyspace, table = table.name) | ||
|
||
def table(name: String, query: TableName => Nel[String]) = { | ||
val tableName = TableName(keyspace = keyspace, table = name) | ||
CreateTables.Table(name = name, queries = query(tableName)) | ||
} | ||
|
||
val snapshot = table(config.snapshotTable, a => Nel.of(SnapshotStatements.createTable(a))) | ||
|
||
val setting = table(config.settingTable, a => Nel.of(SettingStatements.createTable(a))) | ||
|
||
val schema = SnapshotSchema( | ||
snapshot = tableName(snapshot), | ||
setting = tableName(setting)) | ||
|
||
if (config.autoCreate) { | ||
for { | ||
result <- createTables(keyspace, Nel.of(snapshot, setting)) | ||
} yield { | ||
(schema, result) | ||
} | ||
} else { | ||
(schema, false).pure[F] | ||
} | ||
} | ||
|
||
for { | ||
_ <- createKeyspace(config.keyspace) | ||
result <- createTables1 | ||
} yield result | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
...main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SetupSnapshotSchema.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package com.evolutiongaming.kafka.journal.snapshot.cassandra | ||
|
||
import cats.Parallel | ||
import cats.effect.kernel.Temporal | ||
import cats.syntax.all._ | ||
import com.evolutiongaming.catshelper.LogOf | ||
import com.evolutiongaming.kafka.journal.Origin | ||
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraCluster, CassandraConsistencyConfig, CassandraSession, CassandraSync, SettingsCassandra} | ||
|
||
/** Creates a new schema */ | ||
object SetupSnapshotSchema { | ||
|
||
def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf]( | ||
config: SnapshotSchemaConfig, | ||
origin: Option[Origin], | ||
consistencyConfig: CassandraConsistencyConfig | ||
): F[SnapshotSchema] = { | ||
|
||
def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSnapshotSchema(config) | ||
|
||
for { | ||
cassandraSync <- CassandraSync.of[F](config.keyspace, config.locksTable, origin) | ||
ab <- createSchema(cassandraSync) | ||
(schema, fresh) = ab | ||
_ <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig) | ||
} yield schema | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
7 changes: 7 additions & 0 deletions
7
.../src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchema.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package com.evolutiongaming.kafka.journal.snapshot.cassandra | ||
|
||
import com.evolutiongaming.scassandra.TableName | ||
|
||
final case class SnapshotSchema( | ||
snapshot: TableName, | ||
setting: TableName) |
21 changes: 21 additions & 0 deletions
21
...ain/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchemaConfig.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package com.evolutiongaming.kafka.journal.snapshot.cassandra | ||
|
||
import com.evolutiongaming.kafka.journal.eventual.cassandra.KeyspaceConfig | ||
import pureconfig.ConfigReader | ||
import pureconfig.generic.semiauto.deriveReader | ||
|
||
final case class SnapshotSchemaConfig( | ||
keyspace: KeyspaceConfig = KeyspaceConfig.default, | ||
snapshotTable: String = "snapshot_buffer", | ||
settingTable: String = "setting", | ||
locksTable: String = "locks", | ||
autoCreate: Boolean = true | ||
) | ||
|
||
object SnapshotSchemaConfig { | ||
|
||
val default: SnapshotSchemaConfig = SnapshotSchemaConfig() | ||
|
||
implicit val configReaderSchemaConfig: ConfigReader[SnapshotSchemaConfig] = deriveReader | ||
|
||
} |
3 changes: 2 additions & 1 deletion
3
...entual/cassandra/SnapshotStatements.scala → ...apshot/cassandra/SnapshotStatements.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...rc/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSchemaSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package com.evolutiongaming.kafka.journal.snapshot.cassandra | ||
|
||
import cats.Id | ||
import cats.data.{NonEmptyList => Nel} | ||
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CreateKeyspace, CreateTables, KeyspaceConfig} | ||
import com.evolutiongaming.kafka.journal.snapshot.cassandra.{CreateSnapshotSchema, SnapshotSchemaConfig} | ||
import com.evolutiongaming.scassandra.TableName | ||
import org.scalatest.funsuite.AnyFunSuite | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
class CreateSchemaSpec extends AnyFunSuite with Matchers { self => | ||
|
||
test("create keyspace and tables") { | ||
val config = SnapshotSchemaConfig.default | ||
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables) | ||
val initial = State.empty.copy(createTables = true) | ||
val (state, (schema, fresh)) = createSchema.run(initial) | ||
state shouldEqual initial.copy(actions = List(Action.CreateTables, Action.CreateKeyspace)) | ||
fresh shouldEqual true | ||
schema shouldEqual self.schema | ||
} | ||
|
||
test("not create keyspace and tables") { | ||
val config = SnapshotSchemaConfig.default.copy(autoCreate = false) | ||
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables) | ||
val initial = State.empty.copy(createTables = true) | ||
val (state, (schema, fresh)) = createSchema.run(initial) | ||
state shouldEqual initial.copy(actions = List(Action.CreateKeyspace)) | ||
fresh shouldEqual false | ||
schema shouldEqual self.schema | ||
} | ||
|
||
|
||
private val schema = SnapshotSchema( | ||
snapshot = TableName(keyspace = "journal", table = "snapshot_buffer"), | ||
setting = TableName(keyspace = "journal", table = "setting")) | ||
|
||
val createTables: CreateTables[StateT] = new CreateTables[StateT] { | ||
def apply(keyspace: String, tables: Nel[CreateTables.Table]) = { | ||
StateT { state => | ||
val state1 = state.add(Action.CreateTables) | ||
(state1, state.createTables) | ||
} | ||
} | ||
} | ||
|
||
val createKeyspace: CreateKeyspace[StateT] = new CreateKeyspace[StateT] { | ||
def apply(config: KeyspaceConfig) = { | ||
StateT { state => | ||
val state1 = state.add(Action.CreateKeyspace) | ||
(state1, ()) | ||
} | ||
} | ||
} | ||
|
||
|
||
case class State(createTables: Boolean, actions: List[Action]) { | ||
|
||
def add(action: Action): State = copy(actions = action :: actions) | ||
} | ||
|
||
object State { | ||
val empty: State = State(createTables = false, actions = Nil) | ||
} | ||
|
||
|
||
type StateT[A] = cats.data.StateT[Id, State, A] | ||
|
||
object StateT { | ||
def apply[A](f: State => (State, A)): StateT[A] = cats.data.StateT[Id, State, A](f) | ||
} | ||
|
||
|
||
sealed trait Action extends Product | ||
|
||
object Action { | ||
case object CreateTables extends Action | ||
case object CreateKeyspace extends Action | ||
} | ||
} |
Oops, something went wrong.