Skip to content

Commit

Permalink
#1905 | Added reason field to subscription/topic constraints (#1913)
Browse files Browse the repository at this point in the history
* 1905 | Added reason field to subscription and topic constratints on backend.

* 1905 | Added reason field to subscription and topic constratints on frontend.

* 1905 | Added missing unit test for topic constraints validations.

---------

Co-authored-by: Mateusz <76775507+szczygiel-m@users.noreply.github.com>
  • Loading branch information
adriansobolewski and szczygiel-m authored Nov 17, 2024
1 parent 0952046 commit 762d389
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 116 deletions.
12 changes: 8 additions & 4 deletions docs/docs/configuration/consumers-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ Creating workload constraints for topic:
{
"topicName": "pl.allegro.test.HugeTrafficTopic",
"constraints": {
"consumersNumber": 5
"consumersNumber": 5,
"reason": "Topic tuning reason"
}
}
```
Expand All @@ -59,7 +60,8 @@ Creating workload constraints for subscriptions:
{
"subscriptionName": "pl.allegro.test.HugeTrafficTopic$nonCriticalSubscription",
"constraints": {
"consumersNumber": 1
"consumersNumber": 1,
"reason": "Subscription tuning reason"
}
}
```
Expand All @@ -71,12 +73,14 @@ Getting all defined workload constraints:
{
"topicConstraints": {
"pl.allegro.test.HugeTrafficTopic": {
"consumersNumber": 5
"consumersNumber": 5,
"reason": "Topic tuning reason"
}
},
"subscriptionConstraints": {
"pl.allegro.test.HugeTrafficTopic$nonCriticalSubscription": {
"consumersNumber": 1
"consumersNumber": 1,
"reason": "Subscription tuning reason"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,35 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Size;

import java.util.Objects;

public class Constraints {

@Min(1)
private final int consumersNumber;

@Size(max = 1024)
private final String reason;

@JsonCreator
public Constraints(@JsonProperty("consumersNumber") int consumersNumber) {
public Constraints(
@JsonProperty("consumersNumber") int consumersNumber,
@JsonProperty("reason") String reason
) {
this.consumersNumber = consumersNumber;
this.reason = reason;
}

public int getConsumersNumber() {
return consumersNumber;
}

public String getReason() {
return reason;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -28,11 +41,11 @@ public boolean equals(Object o) {
return false;
}
Constraints that = (Constraints) o;
return consumersNumber == that.consumersNumber;
return consumersNumber == that.consumersNumber && Objects.equals(reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hash(consumersNumber);
return Objects.hash(consumersNumber, reason);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pl.allegro.tech.hermes.api

import jakarta.validation.Validation
import jakarta.validation.Validator
import org.hibernate.validator.internal.engine.ConstraintViolationImpl
import spock.lang.Specification

class SubscriptionConstraintsValidationTest extends Specification {
private final Validator validator = Validation.buildDefaultValidatorFactory().getValidator()

def "consumers number has to be greater than zero"() {
given:
def subscriptionConstraints = new SubscriptionConstraints(
"group.topic\$subscription",
new Constraints(consumersNumber, "Some reason")
)

when:
Set<ConstraintViolationImpl<SubscriptionConstraints>> violations = validator.validate(subscriptionConstraints)

then:
violations.propertyPath*.toString() == ["constraints.consumersNumber"]
violations*.interpolatedMessage == ["must be greater than or equal to 1"]

where:
consumersNumber << [-100, -1, 0]
}

def "reason message length has to be max 1024"() {
given:
def subscriptionConstraints = new SubscriptionConstraints(
"group.topic\$subscription",
new Constraints(1, reason)
)

when:
Set<ConstraintViolationImpl<SubscriptionConstraints>> violations = validator.validate(subscriptionConstraints)

then:
violations.propertyPath*.toString() == ["constraints.reason"]
violations*.interpolatedMessage == ["size must be between 0 and 1024"]

where:
reason << [
"r".repeat(1025),
"r".repeat(2048),
"r".repeat(10000)
]
}

def "there shouldn't be any violations for valid inputs"() {
given:
def subscriptionConstraints = new SubscriptionConstraints(
"group.topic\$subscription",
new Constraints(consumersNumber, reason)
)

when:
Set<ConstraintViolationImpl<SubscriptionConstraints>> violations = validator.validate(subscriptionConstraints)

then:
violations.isEmpty()

where:
consumersNumber | reason
1 | "r".repeat(1023)
10 | ""
100 | null
100 | "r"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pl.allegro.tech.hermes.api

import jakarta.validation.Validation
import jakarta.validation.Validator
import org.hibernate.validator.internal.engine.ConstraintViolationImpl
import spock.lang.Specification

class TopicConstraintsValidationTest extends Specification {
private final Validator validator = Validation.buildDefaultValidatorFactory().getValidator()

def "consumers number has to be greater than zero"() {
given:
def topicConstraints = new TopicConstraints(
"group.topic",
new Constraints(consumersNumber, "Some reason")
)

when:
Set<ConstraintViolationImpl<TopicConstraints>> violations = validator.validate(topicConstraints)

then:
violations.propertyPath*.toString() == ["constraints.consumersNumber"]
violations*.interpolatedMessage == ["must be greater than or equal to 1"]

where:
consumersNumber << [-100, -1, 0]
}

def "reason message length has to be max 1024"() {
given:
def topicConstraints = new TopicConstraints(
"group.topic",
new Constraints(1, reason)
)

when:
Set<ConstraintViolationImpl<TopicConstraints>> violations = validator.validate(topicConstraints)

then:
violations.propertyPath*.toString() == ["constraints.reason"]
violations*.interpolatedMessage == ["size must be between 0 and 1024"]

where:
reason << [
"r".repeat(1025),
"r".repeat(2048),
"r".repeat(10000)
]
}

def "there shouldn't be any violations for valid inputs"() {
given:
def TopicConstraints = new TopicConstraints(
"group.topic",
new Constraints(consumersNumber, reason)
)

when:
Set<ConstraintViolationImpl<TopicConstraints>> violations = validator.validate(TopicConstraints)

then:
violations.isEmpty()

where:
consumersNumber | reason
1 | "r".repeat(1023)
10 | ""
100 | null
100 | "r"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import com.fasterxml.jackson.databind.ObjectMapper
import org.awaitility.Awaitility
import org.slf4j.LoggerFactory
import pl.allegro.tech.hermes.api.Constraints
import pl.allegro.tech.hermes.api.SubscriptionName
Expand Down Expand Up @@ -52,48 +51,58 @@ class ZookeeperWorkloadConstraintsCacheTest extends IntegrationTest {

def "should return defined constraints"() {
given:
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3))
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1, "Some reason"))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3, "Some other reason"))
ensureCacheWasUpdated(2)

when:
def constraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
constraints.topicConstraints == [(TopicName.fromQualifiedName('group.topic')): new Constraints(1)]
constraints.subscriptionConstraints == [(SubscriptionName.fromString('group.topic$sub')): new Constraints(3)]
constraints.topicConstraints == [(TopicName.fromQualifiedName('group.topic')): new Constraints(1, "Some reason")]
constraints.subscriptionConstraints == [(SubscriptionName.fromString('group.topic$sub')): new Constraints(3, "Some other reason")]
}

def "should update cache on create node"() {
when:
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1))
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1, "Some reason"))
ensureCacheWasUpdated(1)
def constraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
def topicConstraints = constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
topicConstraints.consumersNumber == 1
topicConstraints.reason == "Some reason"
constraints.subscriptionConstraints == emptyMap()

when:
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3, "Some other reason"))
ensureCacheWasUpdated(2)
def updatedConstraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def updatedTopicConstraints = updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
updatedTopicConstraints.consumersNumber == 1
updatedTopicConstraints.reason == "Some reason"
def updatedSubscriptionConstraints = updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
updatedSubscriptionConstraints.consumersNumber == 3
updatedSubscriptionConstraints.reason == "Some other reason"
}

def "should update cache on delete node"() {
when:
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3))
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1, "Some reason"))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3, "Some other reason"))
ensureCacheWasUpdated(2)
def constraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def topicConstraints = constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
topicConstraints.consumersNumber == 1
topicConstraints.reason == "Some reason"
def subscriptionConstraints = constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
subscriptionConstraints.consumersNumber == 3
subscriptionConstraints.reason == "Some other reason"

when:
deleteData('/hermes/consumers-workload-constraints/group.topic')
Expand All @@ -102,45 +111,60 @@ class ZookeeperWorkloadConstraintsCacheTest extends IntegrationTest {

then:
updatedConstraints.topicConstraints == emptyMap()
constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def updatedSubscriptionConstraints = constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
updatedSubscriptionConstraints.consumersNumber == 3
updatedSubscriptionConstraints.reason == "Some other reason"
}

def "should update cache on change node"() {
given:
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3))
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1, "Some reason 1"))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3, "Some reason 3"))
ensureCacheWasUpdated(2)

when:
def constraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def topicConstraints = constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
topicConstraints.consumersNumber == 1
topicConstraints.reason == "Some reason 1"
def subscriptionConstraints = constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
subscriptionConstraints.consumersNumber == 3
subscriptionConstraints.reason == "Some reason 3"

when:
updateNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(2))
updateNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(2, "Some reason 2"))
ensureCacheWasUpdated(2)

def updatedConstraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 2
updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def updatedTopicConstraints = updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
updatedTopicConstraints.consumersNumber == 2
updatedTopicConstraints.reason == "Some reason 2"
def updatedSubscriptionConstraints = updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
updatedSubscriptionConstraints.consumersNumber == 3
updatedSubscriptionConstraints.reason == "Some reason 3"

}

def "should log error if cannot read data from node"() {
given:
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3))
setupNode('/hermes/consumers-workload-constraints/group.topic', new Constraints(1, "Some reason"))
setupNode('/hermes/consumers-workload-constraints/group.topic$sub', new Constraints(3, "Some other reason"))
ensureCacheWasUpdated(2)

when:
def constraints = pathChildrenCache.getConsumersWorkloadConstraints()

then:
constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def topicConstraints = constraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
topicConstraints.consumersNumber == 1
topicConstraints.reason == "Some reason"
def subscriptionConstraints = constraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
subscriptionConstraints.consumersNumber == 3
subscriptionConstraints.reason == "Some other reason"

when:
updateNode('/hermes/consumers-workload-constraints/group.topic', 'random data')
Expand All @@ -149,8 +173,12 @@ class ZookeeperWorkloadConstraintsCacheTest extends IntegrationTest {
def updatedConstraints = pathChildrenCache.getConsumersWorkloadConstraints()

then: 'data remained intact'
updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic')).consumersNumber == 1
updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub')).consumersNumber == 3
def updatedTopicConstraints = updatedConstraints.topicConstraints.get(TopicName.fromQualifiedName('group.topic'))
updatedTopicConstraints.consumersNumber == 1
updatedTopicConstraints.reason == "Some reason"
def updatedSubscriptionConstraints = updatedConstraints.subscriptionConstraints.get(SubscriptionName.fromString('group.topic$sub'))
updatedSubscriptionConstraints.consumersNumber == 3
updatedSubscriptionConstraints.reason == "Some other reason"

and:
listAppender.list.get(0).formattedMessage == 'Cannot read data from node: /hermes/consumers-workload-constraints/group.topic'
Expand Down
Loading

0 comments on commit 762d389

Please sign in to comment.