From e53c2127e50fa90069cf25afae64bcdbf0ae216f Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Fri, 31 May 2024 09:35:18 -0400 Subject: [PATCH] Fixes #15194: Prevent enqueuing duplicate events for an object --- netbox/extras/context_managers.py | 7 +++--- netbox/extras/events.py | 32 +++++++++++++++---------- netbox/extras/signals.py | 29 +++++++--------------- netbox/extras/tests/test_event_rules.py | 27 +++++++++++++++++++-- netbox/netbox/context.py | 2 +- 5 files changed, 57 insertions(+), 40 deletions(-) diff --git a/netbox/extras/context_managers.py b/netbox/extras/context_managers.py index 8de47465e96..e72cb8cc2d3 100644 --- a/netbox/extras/context_managers.py +++ b/netbox/extras/context_managers.py @@ -13,13 +13,14 @@ def event_tracking(request): :param request: WSGIRequest object with a unique `id` set """ current_request.set(request) - events_queue.set([]) + events_queue.set({}) yield # Flush queued webhooks to RQ - flush_events(events_queue.get()) + if events := list(events_queue.get().values()): + flush_events(events) # Clear context vars current_request.set(None) - events_queue.set([]) + events_queue.set({}) diff --git a/netbox/extras/events.py b/netbox/extras/events.py index 34d2ec159ce..22ce26ba963 100644 --- a/netbox/extras/events.py +++ b/netbox/extras/events.py @@ -58,15 +58,21 @@ def enqueue_object(queue, instance, user, request_id, action): if model_name not in registry['model_features']['event_rules'].get(app_label, []): return - queue.append({ - 'content_type': ContentType.objects.get_for_model(instance), - 'object_id': instance.pk, - 'event': action, - 'data': serialize_for_event(instance), - 'snapshots': get_snapshots(instance, action), - 'username': user.username, - 'request_id': request_id - }) + assert instance.pk is not None + key = f'{app_label}.{model_name}:{instance.pk}' + if key in queue: + queue[key]['data'] = serialize_for_event(instance) + queue[key]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] + else: + queue[key] = { + 'content_type': ContentType.objects.get_for_model(instance), + 'object_id': instance.pk, + 'event': action, + 'data': serialize_for_event(instance), + 'snapshots': get_snapshots(instance, action), + 'username': user.username, + 'request_id': request_id + } def process_event_rules(event_rules, model_name, event, data, username=None, snapshots=None, request_id=None): @@ -163,14 +169,14 @@ def process_event_queue(events): ) -def flush_events(queue): +def flush_events(events): """ - Flush a list of object representation to RQ for webhook processing. + Flush a list of object representations to RQ for event processing. """ - if queue: + if events: for name in settings.EVENTS_PIPELINE: try: func = import_string(name) - func(queue) + func(events) except Exception as e: logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e)) diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index 2813ed7aeb7..9d439ace9af 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -55,18 +55,6 @@ def run_validators(instance, validators): clear_events = Signal() -def is_same_object(instance, webhook_data, request_id): - """ - Compare the given instance to the most recent queued webhook object, returning True - if they match. This check is used to avoid creating duplicate webhook entries. - """ - return ( - ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and - instance.pk == webhook_data['object_id'] and - request_id == webhook_data['request_id'] - ) - - @receiver((post_save, m2m_changed)) def handle_changed_object(sender, instance, **kwargs): """ @@ -112,14 +100,13 @@ def handle_changed_object(sender, instance, **kwargs): objectchange.request_id = request.id objectchange.save() - # If this is an M2M change, update the previously queued webhook (from post_save) + # Ensure that we're working with fresh M2M assignments + if m2m_changed: + instance.refresh_from_db() + + # Enqueue the object for event processing queue = events_queue.get() - if m2m_changed and queue and is_same_object(instance, queue[-1], request.id): - instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments - queue[-1]['data'] = serialize_for_event(instance) - queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] - else: - enqueue_object(queue, instance, request.user, request.id, action) + enqueue_object(queue, instance, request.user, request.id, action) events_queue.set(queue) # Increment metric counters @@ -179,7 +166,7 @@ def handle_deleted_object(sender, instance, **kwargs): obj.snapshot() # Ensure the change record includes the "before" state getattr(obj, related_field_name).remove(instance) - # Enqueue webhooks + # Enqueue the object for event processing queue = events_queue.get() enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) events_queue.set(queue) @@ -195,7 +182,7 @@ def clear_events_queue(sender, **kwargs): """ logger = logging.getLogger('events') logger.info(f"Clearing {len(events_queue.get())} queued events ({sender})") - events_queue.set([]) + events_queue.set({}) # diff --git a/netbox/extras/tests/test_event_rules.py b/netbox/extras/tests/test_event_rules.py index 8cea2078a4c..a1dd8b48e7f 100644 --- a/netbox/extras/tests/test_event_rules.py +++ b/netbox/extras/tests/test_event_rules.py @@ -4,6 +4,7 @@ import django_rq from django.http import HttpResponse +from django.test import RequestFactory from django.urls import reverse from requests import Session from rest_framework import status @@ -12,6 +13,7 @@ from dcim.choices import SiteStatusChoices from dcim.models import Site from extras.choices import EventRuleActionChoices, ObjectChangeActionChoices +from extras.context_managers import event_tracking from extras.events import enqueue_object, flush_events, serialize_for_event from extras.models import EventRule, Tag, Webhook from extras.webhooks import generate_signature, send_webhook @@ -360,7 +362,7 @@ def dummy_send(_, request, **kwargs): return HttpResponse() # Enqueue a webhook for processing - webhooks_queue = [] + webhooks_queue = {} site = Site.objects.create(name='Site 1', slug='site-1') enqueue_object( webhooks_queue, @@ -369,7 +371,7 @@ def dummy_send(_, request, **kwargs): request_id=request_id, action=ObjectChangeActionChoices.ACTION_CREATE ) - flush_events(webhooks_queue) + flush_events(list(webhooks_queue.values())) # Retrieve the job from queue job = self.queue.jobs[0] @@ -377,3 +379,24 @@ def dummy_send(_, request, **kwargs): # Patch the Session object with our dummy_send() method, then process the webhook for sending with patch.object(Session, 'send', dummy_send) as mock_send: send_webhook(**job.kwargs) + + def test_duplicate_triggers(self): + """ + Test for erroneous duplicate event triggers resulting from saving an object multiple times + within the span of a single request. + """ + url = reverse('dcim:site_add') + request = RequestFactory().get(url) + request.id = uuid.uuid4() + request.user = self.user + + self.assertEqual(self.queue.count, 0, msg="Unexpected jobs found in queue") + + with event_tracking(request): + site = Site(name='Site 1', slug='site-1') + site.save() + + # Save the site a second time + site.save() + + self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue") diff --git a/netbox/netbox/context.py b/netbox/netbox/context.py index 56e41cb6311..744c36df4d7 100644 --- a/netbox/netbox/context.py +++ b/netbox/netbox/context.py @@ -7,4 +7,4 @@ current_request = ContextVar('current_request', default=None) -events_queue = ContextVar('events_queue', default=[]) +events_queue = ContextVar('events_queue', default=dict())