Skip to content

Commit

Permalink
messaging: add queue default max length in bytes
Browse files Browse the repository at this point in the history
CMK-21647

Change-Id: Ie8019fb294075eb389f4c859cce8585201981e78
  • Loading branch information
DavidGerva committed Feb 26, 2025
1 parent 6ca2250 commit 7764756
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 27 deletions.
9 changes: 7 additions & 2 deletions packages/cmk-messaging/cmk/messaging/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
# see https://www.rabbitmq.com/docs/ttl
QUEUE_DEFAULT_MESSAGE_TTL = {"x-message-ttl": 60000}

# maximum dimension of the queue in bytes
# the queue will drop old messages when the size exceeds this value
# see https://www.rabbitmq.com/docs/maxlength
QUEUE_DEFAULT_MAX_LENGTH_BYTES = {"x-max-length-bytes": 1073741824} # 1GB


class User(BaseModel):
name: str
Expand Down Expand Up @@ -298,7 +303,7 @@ def add_connecter_definitions(connection: Connection, definition: Definitions) -
vhost=vhost_name,
durable=True,
auto_delete=False,
arguments={**QUEUE_DEFAULT_MESSAGE_TTL},
arguments={**QUEUE_DEFAULT_MESSAGE_TTL, **QUEUE_DEFAULT_MAX_LENGTH_BYTES},
)
binding = Binding(
source=INTERSITE_EXCHANGE,
Expand Down Expand Up @@ -344,7 +349,7 @@ def add_connectee_definitions(connection: Connection, definition: Definitions) -
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**QUEUE_DEFAULT_MESSAGE_TTL},
arguments={**QUEUE_DEFAULT_MESSAGE_TTL, **QUEUE_DEFAULT_MAX_LENGTH_BYTES},
)

# only add binding on default vhost if the connection is within the same customer
Expand Down
120 changes: 96 additions & 24 deletions packages/cmk-messaging/tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"central": [
Expand All @@ -644,7 +647,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
},
Expand All @@ -659,7 +665,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"central": [
Expand All @@ -668,7 +677,10 @@ def test_compute_distributed_definitions_permissions(
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
},
Expand All @@ -682,7 +694,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"remote2": [
Expand All @@ -691,7 +706,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"central": [
Expand All @@ -700,14 +718,20 @@ def test_compute_distributed_definitions_permissions(
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
},
Expand All @@ -721,7 +745,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"remote2": [
Expand All @@ -730,7 +757,10 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
)
],
"central": [
Expand All @@ -739,14 +769,20 @@ def test_compute_distributed_definitions_permissions(
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost="customer2",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
},
Expand All @@ -760,14 +796,20 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
"remote2": [
Expand All @@ -776,14 +818,20 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote1",
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
"central": [
Expand All @@ -792,14 +840,20 @@ def test_compute_distributed_definitions_permissions(
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost="customer1",
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
},
Expand All @@ -813,14 +867,20 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
"remote2": [
Expand All @@ -829,14 +889,20 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote1",
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
"central": [
Expand All @@ -845,14 +911,20 @@ def test_compute_distributed_definitions_permissions(
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
rabbitmq.Queue(
name="cmk.intersite.remote2",
vhost=DEFAULT_VHOST_NAME,
durable=True,
auto_delete=False,
arguments={**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL},
arguments={
**rabbitmq.QUEUE_DEFAULT_MESSAGE_TTL,
**rabbitmq.QUEUE_DEFAULT_MAX_LENGTH_BYTES,
},
),
],
},
Expand Down
3 changes: 2 additions & 1 deletion tests/testlib/unit/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Definitions,
Permission,
Queue,
QUEUE_DEFAULT_MAX_LENGTH_BYTES,
QUEUE_DEFAULT_MESSAGE_TTL,
Shovel,
ShovelValue,
Expand All @@ -23,7 +24,7 @@ def _get_queue(site_id: str) -> Queue:
vhost="/",
durable=True,
auto_delete=False,
arguments={**QUEUE_DEFAULT_MESSAGE_TTL},
arguments={**QUEUE_DEFAULT_MESSAGE_TTL, **QUEUE_DEFAULT_MAX_LENGTH_BYTES},
)


Expand Down

0 comments on commit 7764756

Please sign in to comment.