Skip to content

Commit

Permalink
chore: remove sleep in IntegrationSpec (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Apr 4, 2023
1 parent ec29094 commit 3cfac26
Showing 1 changed file with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,18 +356,20 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
processedB.envelope.event shouldBe "B"

val consumerFilter = ConsumerFilter(system).ref
consumerFilter ! ConsumerFilter.UpdateFilter(streamId, List(ConsumerFilter.ExcludeEntityIds(Set(pid.entityId))))
// FIXME hack sleep to let it propagate to producer side
Thread.sleep(3000)
// look for log message to ensure that filter has propagated to producer side before continuing
LoggingTestKit.debug(s"Stream [$streamId (0-1023)]: Filter update requested").expect {
consumerFilter ! ConsumerFilter.UpdateFilter(streamId, List(ConsumerFilter.ExcludeEntityIds(Set(pid.entityId))))
}

entity ! TestEntity.Persist("c")
processedProbe.expectNoMessage(1.second)

consumerFilter ! ConsumerFilter.UpdateFilter(
streamId,
List(ConsumerFilter.IncludeEntityIds(Set(ConsumerFilter.EntityIdOffset(pid.entityId, 0L)))))
// FIXME hack sleep
Thread.sleep(3000)
// look for log message to ensure that filter has propagated to producer side before continuing
LoggingTestKit.debug(s"Stream [$streamId (0-1023)]: Filter update requested").expect {
consumerFilter ! ConsumerFilter.UpdateFilter(
streamId,
List(ConsumerFilter.IncludeEntityIds(Set(ConsumerFilter.EntityIdOffset(pid.entityId, 0L)))))
}

entity ! TestEntity.Persist("d")

Expand All @@ -384,11 +386,11 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
processedD.envelope.event shouldBe "D"

// remove filter
consumerFilter ! ConsumerFilter.UpdateFilter(
streamId,
List(ConsumerFilter.RemoveIncludeEntityIds(Set(pid.entityId))))
// FIXME hack sleep to let it propagate to producer side
Thread.sleep(3000)
// look for log message to ensure that filter has propagated to producer side before continuing
LoggingTestKit.debug(s"Stream [$streamId (0-1023)]: Filter update requested").expect {
consumerFilter ! ConsumerFilter
.UpdateFilter(streamId, List(ConsumerFilter.RemoveIncludeEntityIds(Set(pid.entityId))))
}

entity ! TestEntity.Persist("e")
processedProbe.expectNoMessage(1.second)
Expand Down Expand Up @@ -419,15 +421,15 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
processedB.envelope.event shouldBe "B"

val consumerFilter = ConsumerFilter(system).ref
consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.PersistenceIdOffset(pid.id, 2L)))
// FIXME hack sleep to let it propagate to producer side
Thread.sleep(3000)
// look for log message to ensure that filter has propagated to producer side before continuing
LoggingTestKit.debug(s"Stream [$streamId (0-1023)]: Replay requested").expect {
consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.PersistenceIdOffset(pid.id, 2L)))
}

entity ! TestEntity.Persist("c")
entity ! TestEntity.Persist("d")

// this doesn't really verify that a replay occurred since same events are propagated the ordinary way

val processedC = processedProbe.receiveMessage()
processedC.envelope.persistenceId shouldBe pid.id
processedC.envelope.sequenceNr shouldBe 3L
Expand Down

0 comments on commit 3cfac26

Please sign in to comment.