Skip to content

Commit

Permalink
feat: consumer filter with tags (#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Apr 5, 2023
1 parent edb4b31 commit c90ca11
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ class ConsumerFilterSpec extends AnyWordSpecLike with Matchers {
mergeFilter(filter3, filter1 ++ filter2) shouldBe filter1
}

"merge and remove include of tags" in {
val filter1 = Vector(ExcludeTags(Set("a", "b")))
mergeFilter(Nil, filter1) shouldBe filter1
mergeFilter(filter1, Nil) shouldBe filter1
val filter2 = Vector(IncludeTags(Set("a", "c")))
mergeFilter(filter1, filter2) shouldBe filter1 ++ filter2
mergeFilter(filter2, filter1) shouldBe filter1 ++ filter2
val filter3 = Vector(RemoveIncludeTags(Set("a", "c")))
mergeFilter(filter1 ++ filter2, filter3) shouldBe filter1
mergeFilter(filter3, filter1 ++ filter2) shouldBe filter1
}

"merge and reduce filter" in {
val filter1 =
Vector(
Expand All @@ -40,6 +52,20 @@ class ConsumerFilterSpec extends AnyWordSpecLike with Matchers {
mergeFilter(filter2, filter1) shouldBe expectedFilter
}

"merge and reduce filter of tags" in {
val filter1 =
Vector(ExcludeTags(Set("a", "b", "c")), IncludeTags(Set("b", "c")))

val filter2 =
Vector(ExcludeTags(Set("d")), RemoveIncludeTags(Set("a", "b")), RemoveExcludeTags(Set("c")))

val expectedFilter =
Vector(ExcludeTags(Set("a", "b", "d")), IncludeTags(Set("c")))

mergeFilter(filter1, filter2) shouldBe expectedFilter
mergeFilter(filter2, filter1) shouldBe expectedFilter
}

"merge and use highest seqNr" in {
val filter1 =
Vector(IncludeEntityIds(Set(EntityIdOffset("b", 1), EntityIdOffset("b", 2), EntityIdOffset("c", 2))))
Expand All @@ -53,6 +79,24 @@ class ConsumerFilterSpec extends AnyWordSpecLike with Matchers {
mergeFilter(filter1, filter2) shouldBe expectedFilter2
}

"create diff for ExcludeTags" in {
val filter1 = Vector(ExcludeTags(Set("a", "b")))
createDiff(Nil, filter1) shouldBe filter1
createDiff(filter1, Nil) shouldBe Vector(RemoveExcludeTags(Set("a", "b")))

val filter2 = Vector(ExcludeTags(Set("a", "c")))
createDiff(filter1, filter2) shouldBe Vector(ExcludeTags(Set("c")), RemoveExcludeTags(Set("b")))
}

"create diff for IncludeTags" in {
val filter1 = Vector(IncludeTags(Set("a", "b")))
createDiff(Nil, filter1) shouldBe filter1
createDiff(filter1, Nil) shouldBe Vector(RemoveIncludeTags(Set("a", "b")))

val filter2 = Vector(IncludeTags(Set("a", "c")))
createDiff(filter1, filter2) shouldBe Vector(IncludeTags(Set("c")), RemoveIncludeTags(Set("b")))
}

"create diff for ExcludeRegexEntityIds" in {
val filter1 = Vector(ExcludeRegexEntityIds(Set("a.*", "b.*")))
createDiff(Nil, filter1) shouldBe filter1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ abstract class ConsumerFilterStoreSpec(implName: String, config: Config)
val store = spawnStore(streamId)
val filter1 =
Vector(
ConsumerFilter.ExcludeTags(Set("t1", "t2")),
ConsumerFilter.ExcludeEntityIds(Set("a", "b", "c")),
ConsumerFilter.IncludeEntityIds(Set(ConsumerFilter.EntityIdOffset("b", 1))))
store ! ConsumerFilterStore.UpdateFilter(filter1)
notifyProbe.expectMessage(ConsumerFilterRegistry.FilterUpdated(streamId, filter1))

val filter2 =
Vector(
ConsumerFilter.RemoveExcludeTags(Set("t2")),
ConsumerFilter.ExcludeEntityIds(Set("d")),
ConsumerFilter.RemoveExcludeEntityIds(Set("c")),
ConsumerFilter.RemoveIncludeEntityIds(Set("b")))
store ! ConsumerFilterStore.UpdateFilter(filter2)
val expectedFilter = Vector(ConsumerFilter.ExcludeEntityIds(Set("a", "b", "d")))
val expectedFilter =
Vector(ConsumerFilter.ExcludeTags(Set("t1")), ConsumerFilter.ExcludeEntityIds(Set("a", "b", "d")))
notifyProbe.expectMessage(ConsumerFilterRegistry.FilterUpdated(streamId, expectedFilter))

store ! ConsumerFilterStore.GetFilter(replyProbe.ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ class ConsumerSerializerSpec extends ScalaTestWithActorTestKit with AnyWordSpecL

private val filter1 =
Vector(
ConsumerFilter.ExcludeTags(Set("t1", "t2")),
ConsumerFilter.IncludeTags(Set("t3", "t4")),
ConsumerFilter.ExcludeRegexEntityIds(Set("all.*")),
ConsumerFilter.ExcludeEntityIds(Set("a", "b", "c")),
ConsumerFilter.IncludeEntityIds(Set(ConsumerFilter.EntityIdOffset("b", 1))))

private val filter2 =
Vector(
ConsumerFilter.RemoveExcludeTags(Set("t2")),
ConsumerFilter.RemoveIncludeTags(Set("t4")),
ConsumerFilter.ExcludeEntityIds(Set("d")),
ConsumerFilter.RemoveExcludeEntityIds(Set("c")),
ConsumerFilter.RemoveIncludeEntityIds(Set("b")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,70 @@

package akka.projection.grpc.internal

import java.time.Instant

import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.PersistenceId
import akka.projection.grpc.internal.FilterStage.Filter
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class FilterSpec extends AnyWordSpecLike with Matchers {

private def createEnvelope(pid: String, tags: Set[String] = Set.empty): EventEnvelope[Any] = {
val slice = math.abs(pid.hashCode % 1024)
val seqNr = 1
val now = Instant.now()
EventEnvelope(
TimestampOffset(Instant.now, Map(pid -> seqNr)),
pid,
seqNr,
"evt",
now.toEpochMilli,
PersistenceId.extractEntityType(pid),
slice,
filtered = false,
source = "",
tags = tags)
}

"Filter" should {
"include with empty filter" in {
Filter.empty.matches("pid-1") shouldBe true
Filter.empty.matches(createEnvelope("pid")) shouldBe true
}

"honor include after exclude" in {
val filter =
Filter.empty.addIncludePersistenceIds(Set("pid-1", "pid-3")).addExcludePersistenceIds(Set("pid-3", "pid-2"))
filter.matches("pid-1") shouldBe true
filter.matches("pid-2") shouldBe false
filter.matches("pid-3") shouldBe true
Filter.empty
.addIncludePersistenceIds(Set("pid-1", "pid-3"))
.addExcludePersistenceIds(Set("pid-3", "pid-2"))
.addIncludeTags(Set("a"))
.addExcludeTags(Set("a", "b"))
filter.matches(createEnvelope("pid-1")) shouldBe true
filter.matches(createEnvelope("pid-2")) shouldBe false
filter.matches(createEnvelope("pid-3")) shouldBe true
filter.matches(createEnvelope("pid-4", tags = Set("b"))) shouldBe false
filter.matches(createEnvelope("pid-5", tags = Set("a"))) shouldBe true
filter.matches(createEnvelope("pid-6", tags = Set("a", "b"))) shouldBe true
}

"exclude with regexp" in {
val filter =
Filter.empty.addIncludePersistenceIds(Set("Entity|a-1", "Entity|a-2")).addExcludeRegexEntityIds(List("a-.*"))
filter.matches("Entity|a-1") shouldBe true
filter.matches("Entity|a-2") shouldBe true
filter.matches("Entity|a-3") shouldBe false
filter.matches("Entity|b-1") shouldBe true
filter.matches(createEnvelope("Entity|a-1")) shouldBe true
filter.matches(createEnvelope("Entity|a-2")) shouldBe true
filter.matches(createEnvelope("Entity|a-3")) shouldBe false
filter.matches(createEnvelope("Entity|b-1")) shouldBe true
}

"remove criteria" in {
val filter =
Filter.empty.addIncludePersistenceIds(Set("pid-1", "pid-3")).addExcludePersistenceIds(Set("pid-3", "pid-2"))
filter.matches("pid-1") shouldBe true
filter.matches("pid-3") shouldBe true
filter.matches(createEnvelope("pid-1")) shouldBe true
filter.matches(createEnvelope("pid-3")) shouldBe true
val filter2 = filter.removeIncludePersistenceIds(Set("pid-3"))
filter2.matches("pid-3") shouldBe false
filter2.matches(createEnvelope("pid-3")) shouldBe false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.persistence.typed.ReplicationId
import akka.projection.grpc.internal.proto.EntityIdOffset
import akka.projection.grpc.internal.proto.ExcludeEntityIds
import akka.projection.grpc.internal.proto.ExcludeRegexEntityIds
import akka.projection.grpc.internal.proto.ExcludeTags
import akka.projection.grpc.internal.proto.FilterCriteria
import akka.projection.grpc.internal.proto.FilterReq
import akka.projection.grpc.internal.proto.IncludeEntityIds
Expand Down Expand Up @@ -152,14 +153,23 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
}

"use filter request" in new Setup {
val filterCriteria = List(FilterCriteria(FilterCriteria.Message.ExcludeEntityIds(ExcludeEntityIds(List("b")))))
override lazy val allEnvelopes = envelopes ++
Vector(
createEnvelope(PersistenceId(entityType, "d"), 1, "d1", tags = Set("t1")),
createEnvelope(PersistenceId(entityType, "d"), 2, "d2"))

val filterCriteria = List(
FilterCriteria(FilterCriteria.Message.ExcludeTags(ExcludeTags(List("t1")))),
FilterCriteria(FilterCriteria.Message.ExcludeEntityIds(ExcludeEntityIds(List("b")))))
inPublisher.sendNext(StreamIn(StreamIn.Message.Filter(FilterReq(filterCriteria))))

envelopes.foreach(envPublisher.sendNext)
allEnvelopes.foreach(envPublisher.sendNext)
outProbe.request(10)
outProbe.expectNext(envelopesFor("a").head)
// b filtered out
// b filtered out by ExcludeEntityIds
outProbe.expectNext(envelopesFor("c").head)
// d1 filtered out by ExcludeTags
outProbe.expectNext(envelopesFor("d").last)
outProbe.expectNoMessage()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ option optimize_for = SPEED;

message ConsumerFilterStoreState {
// ORSet serialized with Akka's ReplicatedDataSerializer
bytes exclude_regex_entity_ids = 1;
bytes exclude_tags = 1;
// ORSet serialized with Akka's ReplicatedDataSerializer
bytes exclude_entity_ids = 2;
SeqNrMap include_entity_offsets = 3;
bytes include_tags = 2;
// ORSet serialized with Akka's ReplicatedDataSerializer
bytes exclude_regex_entity_ids = 3;
// ORSet serialized with Akka's ReplicatedDataSerializer
bytes exclude_entity_ids = 4;
SeqNrMap include_entity_offsets = 5;
}

message SeqNrMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,35 @@ message FilterCriteria {
// If no matching include the event is discarded.
// If matching include the event is emitted.
oneof message {
ExcludeRegexEntityIds exclude_matching_entity_ids = 1;
RemoveExcludeRegexEntityIds remove_exclude_matching_entity_ids = 2;
ExcludeEntityIds exclude_entity_ids = 3;
RemoveExcludeEntityIds remove_exclude_entity_ids = 4;
IncludeEntityIds include_entity_ids = 5;
RemoveIncludeEntityIds remove_include_entity_ids = 6;
ExcludeTags exclude_tags = 1;
RemoveExcludeTags remove_exclude_tags = 2;
IncludeTags include_tags = 3;
RemoveIncludeTags remove_include_tags = 4;
ExcludeRegexEntityIds exclude_matching_entity_ids = 5;
RemoveExcludeRegexEntityIds remove_exclude_matching_entity_ids = 6;
ExcludeEntityIds exclude_entity_ids = 7;
RemoveExcludeEntityIds remove_exclude_entity_ids = 8;
IncludeEntityIds include_entity_ids = 9;
RemoveIncludeEntityIds remove_include_entity_ids = 10;
}
}

message ExcludeTags {
repeated string tags = 1;
}

message RemoveExcludeTags {
repeated string tags = 1;
}

message IncludeTags {
repeated string tags = 1;
}

message RemoveIncludeTags {
repeated string tags = 1;
}

message IncludeEntityIds {
repeated EntityIdOffset entity_id_offset = 1;
}
Expand Down
Loading

0 comments on commit c90ca11

Please sign in to comment.