Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: low hanging FIXMEs #844

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.projection.grpc.internal
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.ConsumerFilter.ConsumerFilterSettings
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand All @@ -17,6 +18,8 @@ class ConsumerFilterRegistrySpec
with TestData
with LogCapturing {

private val consumerFilterSettings = ConsumerFilterSettings(system)

private var streamIdCounter = 0
def nextStreamId(): String = {
streamIdCounter += 1
Expand All @@ -26,7 +29,7 @@ class ConsumerFilterRegistrySpec
"ConsumerFilterRegistry" must {
"get filter" in {
val streamId = nextStreamId()
val registry = spawn(ConsumerFilterRegistry())
val registry = spawn(ConsumerFilterRegistry(consumerFilterSettings))

val currentProbe = createTestProbe[ConsumerFilter.CurrentFilter]()
registry ! ConsumerFilter.GetFilter(streamId, currentProbe.ref)
Expand Down Expand Up @@ -56,7 +59,7 @@ class ConsumerFilterRegistrySpec

"update filter and notify subscriber" in {
val streamId = nextStreamId()
val registry = spawn(ConsumerFilterRegistry())
val registry = spawn(ConsumerFilterRegistry(consumerFilterSettings))

val subscriberProbe = createTestProbe[ConsumerFilter.SubscriberCommand]()
registry ! ConsumerFilter.Subscribe(streamId, Nil, subscriberProbe.ref)
Expand All @@ -72,7 +75,7 @@ class ConsumerFilterRegistrySpec

"notify subscriber with diff" in {
val streamId = nextStreamId()
val registry = spawn(ConsumerFilterRegistry())
val registry = spawn(ConsumerFilterRegistry(consumerFilterSettings))

val subscriberProbe1 = createTestProbe[ConsumerFilter.SubscriberCommand]()
registry ! ConsumerFilter.Subscribe(streamId, Nil, subscriberProbe1.ref)
Expand Down Expand Up @@ -111,7 +114,7 @@ class ConsumerFilterRegistrySpec

"start diff from init filter" in {
val streamId = nextStreamId()
val registry = spawn(ConsumerFilterRegistry())
val registry = spawn(ConsumerFilterRegistry(consumerFilterSettings))

val initFilter = Vector(ConsumerFilter.ExcludeEntityIds(Set("a", "c")))
registry ! ConsumerFilter.UpdateFilter(streamId, initFilter)
Expand Down Expand Up @@ -145,7 +148,7 @@ class ConsumerFilterRegistrySpec

"notify new subscriber of diff from concurrent update" in {
val streamId = nextStreamId()
val registry = spawn(ConsumerFilterRegistry())
val registry = spawn(ConsumerFilterRegistry(consumerFilterSettings))

val initFilter = Vector(ConsumerFilter.ExcludeEntityIds(Set("a", "c")))
registry ! ConsumerFilter.UpdateFilter(streamId, initFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.ConsumerFilter.ConsumerFilterSettings
import akka.projection.grpc.consumer.ConsumerFilter.CurrentFilter
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
Expand All @@ -23,6 +24,8 @@ abstract class ConsumerFilterStoreSpec(implName: String, config: Config)
with TestData
with LogCapturing {

val consumerFilterSettings = ConsumerFilterSettings(system)

val notifyProbe = createTestProbe[ConsumerFilterRegistry.FilterUpdated]()

private var streamIdCounter = 0
Expand Down Expand Up @@ -107,11 +110,9 @@ class DdataConsumerFilterStoreSpec
canonical.port = 0
}
}
// FIXME impl State serialization
akka.actor.allow-java-serialization = on
""")) {
override def spawnStore(streamId: String): ActorRef[ConsumerFilterStore.Command] = {
spawn(DdataConsumerFilterStore(streamId, notifyProbe.ref))
spawn(DdataConsumerFilterStore(consumerFilterSettings, streamId, notifyProbe.ref))
}

Cluster(system).manager ! Join(Cluster(system).selfMember.address)
Expand All @@ -120,7 +121,8 @@ class DdataConsumerFilterStoreSpec
val replyProbe = createTestProbe[ConsumerFilter.CurrentFilter]()
val streamId = nextStreamId()
ConsumerFilterStore.useDistributedData(system) shouldBe true
val behv = ConsumerFilterStore.createDdataConsumerFilterStore(system, streamId, notifyProbe.ref)
val behv =
ConsumerFilterStore.createDdataConsumerFilterStore(system, consumerFilterSettings, streamId, notifyProbe.ref)
val store = spawn(behv)
store ! ConsumerFilterStore.GetFilter(replyProbe.ref)
replyProbe.expectMessage(CurrentFilter(streamId, Nil))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.projection.grpc.internal.proto.IncludeEntityIds
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.ReplayReq
import akka.projection.grpc.internal.proto.StreamIn
import akka.projection.grpc.producer.EventProducerSettings
import akka.stream.scaladsl.BidiFlow
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
Expand All @@ -46,6 +47,8 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""

private val persistence = Persistence(system)

private val producerSettings = EventProducerSettings(system)

private def createEnvelope(
pid: PersistenceId,
seqNr: Long,
Expand Down Expand Up @@ -97,7 +100,7 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
import scala.concurrent.duration._

import akka.pattern.{ after => futureAfter }
if (eventsByPersistenceIdConcurrency.incrementAndGet() > FilterStage.ReplayParallelism)
if (eventsByPersistenceIdConcurrency.incrementAndGet() > producerSettings.replayParallelism)
throw new IllegalStateException("Unexpected, too many concurrent calls to currentEventsByPersistenceId")
Source
.futureSource(futureAfter(10.millis) {
Expand All @@ -121,7 +124,8 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
0 until persistence.numberOfSlices,
initFilter,
testCurrentEventsByPersistenceIdQuery(allEnvelopes),
producerFilter = initProducerFilter))
producerFilter = initProducerFilter,
replayParallelism = producerSettings.replayParallelism))
.join(Flow.fromSinkAndSource(Sink.ignore, envSource))
private val streamIn: Source[StreamIn, TestPublisher.Probe[StreamIn]] = TestSource()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.grpc.producer.EventProducerSettings")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productElementNames")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.copy$default$1")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productPrefix")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productArity")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productElement")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productIterator")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.canEqual")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.productElementName")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.grpc.producer.EventProducerSettings$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.EventProducerSettings.unapply")
9 changes: 9 additions & 0 deletions akka-projection-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ akka.projection.grpc {
# can be used for example for authorization in combination with an interceptor in the producer.
# Example "x-auth-header": "secret"
additional-request-headers {}

filter {
ddata-read-timeout = 3s
ddata-write-timeout = 3s
}
}

producer {
Expand All @@ -28,6 +33,10 @@ akka.projection.grpc {
# When using async transformations it can be good to increase this.
transformation-parallelism = 1

filter {
replay-parallelism = 3
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import java.util.{ Set => JSet }

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration

import akka.util.ccompat.JavaConverters._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
Expand All @@ -19,6 +19,9 @@ import akka.actor.typed.Props
import akka.annotation.InternalApi
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.internal.ConsumerFilterRegistry
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import com.typesafe.config.Config

// FIXME add ApiMayChange in all places

Expand All @@ -28,6 +31,7 @@ import akka.projection.grpc.internal.ConsumerFilterRegistry
object ConsumerFilter extends ExtensionId[ConsumerFilter] {

private val ReplicationIdSeparator = '|'
private val ToStringLimit = 100

trait Command
sealed trait SubscriberCommand extends Command {
Expand Down Expand Up @@ -121,6 +125,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
/** Java API */
def this(entityIds: JSet[String]) =
this(entityIds.asScala.toSet)

override def toString: String = {
if (entityIds.size > ToStringLimit)
s"ExcludeEntityIds(${entityIds.take(ToStringLimit).mkString(", ")} ...)"
else
s"ExcludeEntityIds(${entityIds.mkString(", ")})"
}
}

object RemoveExcludeEntityIds {
Expand All @@ -136,6 +147,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
/** Java API */
def this(entityIds: JSet[String]) =
this(entityIds.asScala.toSet)

override def toString: String = {
if (entityIds.size > ToStringLimit)
s"RemoveExcludeEntityIds(${entityIds.take(ToStringLimit).mkString(", ")} ...)"
else
s"RemoveExcludeEntityIds(${entityIds.mkString(", ")})"
}
}

object IncludeEntityIds {
Expand All @@ -156,6 +174,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
/** Java API */
def this(entityOffsets: JSet[EntityIdOffset]) =
this(entityOffsets.asScala.toSet)

override def toString: String = {
if (entityOffsets.size > ToStringLimit)
s"IncludeEntityIds(${entityOffsets.take(ToStringLimit).mkString(", ")} ...)"
else
s"IncludeEntityIds(${entityOffsets.mkString(", ")})"
}
}

object RemoveIncludeEntityIds {
Expand All @@ -171,6 +196,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
/** Java API */
def this(entityIds: JSet[String]) =
this(entityIds.asScala.toSet)

override def toString: String = {
if (entityIds.size > ToStringLimit)
s"RemoveIncludeEntityIds(${entityIds.take(ToStringLimit).mkString(", ")} ...)"
else
s"RemoveIncludeEntityIds(${entityIds.mkString(", ")})"
}
}

private def addReplicaIdToEntityId(replicaId: ReplicaId, entityId: String): String =
Expand Down Expand Up @@ -322,11 +354,29 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
@InternalApi private[akka] def hasRemoveCriteria(filter: immutable.Seq[FilterCriteria]): Boolean =
filter.exists(_.isInstanceOf[RemoveCriteria])

/** INTERNAL API */
@InternalApi private[akka] object ConsumerFilterSettings {
def apply(system: ActorSystem[_]): ConsumerFilterSettings =
apply(system.settings.config.getConfig("akka.projection.grpc.consumer.filter"))

def apply(config: Config): ConsumerFilterSettings =
ConsumerFilterSettings(
ddataReadTimeout = config.getDuration("ddata-read-timeout").asScala,
ddataWriteTimeout = config.getDuration("ddata-write-timeout").asScala)
}

/** INTERNAL API */
@InternalApi private[akka] final case class ConsumerFilterSettings(
ddataReadTimeout: FiniteDuration,
ddataWriteTimeout: FiniteDuration)

}

class ConsumerFilter(system: ActorSystem[_]) extends Extension {

private val settings = ConsumerFilter.ConsumerFilterSettings(system)

val ref: ActorRef[ConsumerFilter.Command] =
system.systemActorOf(ConsumerFilterRegistry(), "projectionGrpcConsumerFilter", Props.empty)
system.systemActorOf(ConsumerFilterRegistry(settings), "projectionGrpcConsumerFilter", Props.empty)

}
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ final class GrpcReadJournal private (
val initFilter = {
import akka.actor.typed.scaladsl.AskPattern._
import scala.concurrent.duration._
implicit val askTimeout: Timeout = 10.seconds // FIXME config
implicit val askTimeout: Timeout = 10.seconds
consumerFilter.ref.ask[ConsumerFilter.CurrentFilter](ConsumerFilter.GetFilter(streamId, _))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.ConsumerFilter.ConsumerFilterSettings
import akka.util.Timeout

/**
Expand All @@ -34,9 +35,9 @@ import akka.util.Timeout

private final case class Subscriber(streamId: String, ref: ActorRef[SubscriberCommand])

def apply(): Behavior[Command] = {
def apply(settings: ConsumerFilterSettings): Behavior[Command] = {
Behaviors.setup { context =>
new ConsumerFilterRegistry(context).behavior(Map.empty, Map.empty)
new ConsumerFilterRegistry(context, settings).behavior(Map.empty, Map.empty)
}

}
Expand All @@ -46,12 +47,12 @@ import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] class ConsumerFilterRegistry(context: ActorContext[ConsumerFilter.Command]) {
@InternalApi private[akka] class ConsumerFilterRegistry(
context: ActorContext[ConsumerFilter.Command],
settings: ConsumerFilterSettings) {
import ConsumerFilter._
import ConsumerFilterRegistry._

// FIXME add unit test for this actor

private def behavior(
subscribers: Map[Subscriber, immutable.Seq[FilterCriteria]],
stores: Map[String, ActorRef[ConsumerFilterStore.Command]]): Behavior[Command] = {
Expand All @@ -61,7 +62,7 @@ import akka.util.Timeout
case Some(store) => store
case None =>
context.spawn(
ConsumerFilterStore(context.system, streamId, context.self),
ConsumerFilterStore(context.system, settings, streamId, context.self),
URLEncoder.encode(streamId, StandardCharsets.UTF_8.name))
}
}
Expand Down Expand Up @@ -127,10 +128,7 @@ import akka.util.Timeout
behavior(subscribers, stores.updated(streamId, store))

case cmd: Replay =>
// FIXME revisit the Replay command. Might not be useful for end users since we
// don't propagate it to other nodes. We need it (or similar) internally for the lazy replay.
publishToSubscribers(cmd)

Behaviors.same

case internalCommand: InternalCommand =>
Expand Down
Loading