From 1050fc34de56faf2fd8080d0e50e895cdb8dc4b6 Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Tue, 6 Jun 2023 10:44:44 -0400 Subject: [PATCH] fix: Catch KeyError's when processing events, fix transforms in file-to-file --- .../management/commands/helpers/event_log_parser.py | 2 +- .../management/commands/helpers/queued_sender.py | 7 ++++++- .../commands/tests/test_transform_tracking_logs.py | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/event_routing_backends/management/commands/helpers/event_log_parser.py b/event_routing_backends/management/commands/helpers/event_log_parser.py index 2cd87341..eecf9b91 100644 --- a/event_routing_backends/management/commands/helpers/event_log_parser.py +++ b/event_routing_backends/management/commands/helpers/event_log_parser.py @@ -47,7 +47,7 @@ def parse_json_event(line): parsed["timestamp"] = parsed["time"] return parsed - except (AttributeError, JSONDecodeError) as e: + except (AttributeError, JSONDecodeError, KeyError) as e: log.error("EXCEPTION!!!") log.error(type(e)) log.error(e) diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index 5a640ea4..4ff41664 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -103,6 +103,8 @@ def queue(self, event): def send(self): """ Send to the LRS if we're configured for that, otherwise a no-op. + + Events are converted to the output xAPI / Caliper format in the router. """ if self.destination == "LRS": print(f"Sending {len(self.event_queue)} events to LRS...") @@ -114,6 +116,8 @@ def send(self): def store(self): """ Store to a libcloud destination if we're configured for that. + + Events are converted to the output xAPI / Caliper format here before being saved. """ if self.destination == "LRS": print("Store is being called on an LRS destination, skipping.") @@ -130,7 +134,8 @@ def store(self): out = BytesIO() for event in self.event_queue: - out.write(str.encode(json.dumps(event))) + transformed_event = self.router.processors[0].process(event) + out.write(str.encode(json.dumps(transformed_event))) out.write(str.encode("\n")) out.seek(0) diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index 0cdb0233..86cc6e7f 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -185,9 +185,11 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys): mm.return_value.download_object_as_stream = get_raw_log_stream mock_libcloud_get_driver.return_value = mm - # Fake a router mapping so some events in the log are actually processed mm2 = MagicMock() + # Fake a router mapping so some events in the log are actually processed mm2.registry.mapping = {"problem_check": 1} + # Fake a process response that can be serialized to json + mm2.process.return_value = {"foo": "bar"} mock_eventsrouter.return_value.processors = [mm2] call_command(