From d5999f31668d6c5c65d0c92e08c598a23afa063d Mon Sep 17 00:00:00 2001 From: Brian Mesick Date: Tue, 16 May 2023 16:38:56 -0400 Subject: [PATCH] feat: Add command to bulk transform event log files --- docs/howto/how_to_bulk_transform.rst | 118 ++++++ .../backends/events_router.py | 175 +++++--- .../backends/tests/test_events_router.py | 382 ++++++++++++++++- event_routing_backends/management/__init__.py | 0 .../management/commands/__init__.py | 0 .../management/commands/helpers/__init__.py | 0 .../commands/helpers/event_log_parser.py | 56 +++ .../commands/helpers/queued_sender.py | 165 ++++++++ .../commands/tests/fixtures/tracking.log | 12 + .../tests/test_transform_tracking_logs.py | 383 ++++++++++++++++++ .../commands/transform_tracking_logs.py | 254 ++++++++++++ .../processors/caliper/tests/test_caliper.py | 4 +- .../caliper/transformer_processor.py | 9 +- .../processors/mixins/base_transformer.py | 7 +- .../event_transformers/navigation_events.py | 2 +- .../problem_interaction_events.py | 25 +- .../xapi/event_transformers/video_events.py | 11 +- .../edx.grades.problem.submitted.json | 2 +- .../processors/xapi/transformer.py | 3 +- .../processors/xapi/transformer_processor.py | 3 +- event_routing_backends/tasks.py | 52 +++ event_routing_backends/utils/http_client.py | 36 ++ .../utils/xapi_lrs_client.py | 24 ++ requirements/base.in | 4 +- requirements/base.txt | 30 +- requirements/ci.txt | 2 +- requirements/dev.txt | 48 ++- requirements/doc.txt | 47 ++- requirements/pip.txt | 2 +- requirements/quality.txt | 47 ++- requirements/test.txt | 41 +- 31 files changed, 1787 insertions(+), 157 deletions(-) create mode 100644 docs/howto/how_to_bulk_transform.rst create mode 100644 event_routing_backends/management/__init__.py create mode 100644 event_routing_backends/management/commands/__init__.py create mode 100644 event_routing_backends/management/commands/helpers/__init__.py create mode 100644 event_routing_backends/management/commands/helpers/event_log_parser.py create mode 100644 event_routing_backends/management/commands/helpers/queued_sender.py create mode 100644 event_routing_backends/management/commands/tests/fixtures/tracking.log create mode 100644 event_routing_backends/management/commands/tests/test_transform_tracking_logs.py create mode 100644 event_routing_backends/management/commands/transform_tracking_logs.py diff --git a/docs/howto/how_to_bulk_transform.rst b/docs/howto/how_to_bulk_transform.rst new file mode 100644 index 00000000..29b59c88 --- /dev/null +++ b/docs/howto/how_to_bulk_transform.rst @@ -0,0 +1,118 @@ +How To Bulk Transform Tracking Logs +=================================== + +This is a rough guide of how to transform existing tracking log files into the formats supported by event-routing-backends using the ``transform_tracking_logs`` Django management command inside a running LMS installation. Because the transformations perform database access, looking up user, course, and block data, you will need to run this command on the same install of Open edX that created the tracking log files. + +.. warning:: This also means that doing large amounts of transformations can cause performance issues on the LMS and downstream learning record stores. Make sure to use the ``--batch_size`` and ``--sleep_between_batches_secs`` options to balance system performance vs load time. + +Sources and Destinations +------------------------ + +For most sources and destinations we use `Apache Libcloud Object storage `__ . This should cover casees from local storage to Amazon S3, MinIO, and many other cloud storage solutions. The ``--source_provider`` and ``--destination_provider`` options are the Libcloud Provider names, in all caps (ex: ``S3``, ``LOCAL``, ``MINIO``). The ``--source_config`` and ``--destination_config`` options are JSON strings passed directly to the Libcloud constructor as keyword args. + +The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores. + +For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path. + +:: + + # This will attempt to recursively read all files in the "/openedx/data/logs/" directory + {"key": "/openedx/data", "container": "logs"} + + # This will attempt to recursively read all files in "/openedx/data/logs/tracking/" as + # well as any files in "/openedx/data/logs/" that begin with "tracking" + {"key": "/openedx/data", "container": "logs", "prefix": "tracking"} + + # This will attempt to read a single file named "/openedx/data/logs/tracking.log" + {"key": "/openedx/data", "container": "logs", "prefix": "tracking.log"} + + +For other providers ``key`` and ``secret`` are authentication credentials and ``container`` is roughly synonymous with an S3 bucket. Configuration for each provider is different, please consult the libcloud docs for your provider to learn about other options you may need to pass in to the ``--source_config`` and ``--destination_config`` JSON structures. + + +Modes Of Operation +------------------ + +The command can work in a few distinct ways. + +**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. + +.. note:: Note that any event filters in the ERB configuration will work as usual on these events. + +**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied. + +.. note:: Note that NO event filters from the usual ERB configuration will impact these events, since they happen on a per-destination basis. + +Additionally you can run with the ``--log_statements`` option in either mode to send all generated statements to a Python logger which can be configured to save to a file, standard out, or a log forwarder like `Vector `__ for more statement handling options. These logs are not filtered. + +**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with ``--log_statements`` you can use Python log forwarding without the additional overhead of storing full files. + + +Examples +-------- + +**Files to LRS** + +:: + + # Transform all events in the local file /openedx/data/tracking.log to all configured LRSs + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \ + --destination_provider LRS \ + --transformer_type xapi + +:: + + # Transform all events in the local file /openedx/data/tracking.log to all configured LRSs + # using a smaller batch size and long sleep for LMS performance + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \ + --destination_provider LRS \ + --transformer_type caliper \ + --batch_size 1000 \ + --sleep_between_batches_secs 2.5 + +:: + + # Recursively transform any files whose names start with "tracking" from a "logs" directory in the + # MINIO bucket "logs" to all configured LRSs + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking"}' \ + --destination_provider MINIO --destination_config '{"key": "openedx", "secret": "minio secret key", "container": "openedx", "prefix": "transformed_logs/2023-06-01/", "host": "files.local.overhang.io", "secure": false}' \ + --transformer_type xapi + + python manage.py lms transform_tracking_logs \ + --transformer_type xapi + --source_provider S3 \ + --source_config '{"key": "AWS key", "secret": "AWS secret", "container": "logs", "prefix":"tracking"}' \ + --destination_provider LRS + +**Files to Files** + +:: + + # Transform the entire local file /openedx/data/tracking.log to a new file in the local directory + # /openedx/data/logs/transformed_events/ the file will be named with the current timestamp. + # Note: The "container" directory must exist! + python manage.py lms transform_tracking_logs \ + --transformer_type caliper \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "container": "logs", "prefix": "tracking.log"}' \ + --destination_provider LOCAL \ + --destination_config '{"key": "/openedx/data/", "container": "transformed_logs", "prefix": "2023-06-01"}' + +:: + + # Recursively transform any files whose names start with "tracking" from a "tracking_logs" directory in the + # MinIO bucket "openedx" to a single file in a MinIO storage's "transformed_logs" bucket. + # ie: http://files.local.overhang.io/openedx/tracking_logs/tracking* to http://files.local.overhang.io/openedx/transformed_logs/2023-06-02/23-06-02_20-29-20_xapi.log + # This is the configuration for a tutor local environment with MinIO enabled. + python manage.py lms transform_tracking_logs \ + --source_provider MINIO \ + --source_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "/tracking_logs", "host": "files.local.overhang.io", "secure": false}' \ + --destination_provider MINIO \ + --destination_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "transformed_logs/2023-06-02/", "host": "files.local.overhang.io", "secure": false}' --transformer_type xapi + diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index f63277ca..ef64e820 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -7,7 +7,7 @@ from event_routing_backends.helpers import get_business_critical_events from event_routing_backends.models import RouterConfiguration -from event_routing_backends.tasks import dispatch_event, dispatch_event_persistent +from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent logger = logging.getLogger(__name__) @@ -28,80 +28,133 @@ def __init__(self, processors=None, backend_name=None): self.processors = processors if processors else [] self.backend_name = backend_name - def send(self, event): + def configure_host(self, host, router): """ - Send the event to configured routers after processing it. - - Event is processed through the configured processors. A router config - object matching the backend_name and other match params is used to get - the list of hosts to which the event is required to be delivered to. - - Arguments: - event (dict): original event dictionary + Create host_configurations for the given host and router. + """ + host['host_configurations'] = {} + host['host_configurations'].update({'url': router.route_url}) + host['host_configurations'].update({'auth_scheme': router.auth_scheme}) + + if router.auth_scheme == RouterConfiguration.AUTH_BASIC: + host['host_configurations'].update({'username': router.username}) + host['host_configurations'].update({'password': router.password}) + elif router.auth_scheme == RouterConfiguration.AUTH_BEARER: + host['host_configurations'].update({'auth_key': router.auth_key}) + + if router.backend_name == RouterConfiguration.CALIPER_BACKEND: + host.update({'router_type': 'AUTH_HEADERS'}) + if 'headers' in host: + host['host_configurations'].update({'headers': host['headers']}) + elif router.backend_name == RouterConfiguration.XAPI_BACKEND: + host.update({'router_type': 'XAPI_LRS'}) + else: + host.update({'router_type': 'INVALID_TYPE'}) + + return host + + def prepare_to_send(self, events): + """ + Prepare a list of events to be sent and create a processed, filtered batch for each router. """ routers = RouterConfiguration.get_enabled_routers(self.backend_name) + business_critical_events = get_business_critical_events() + route_events = {} if not routers: logger.info('Could not find any enabled router configuration for backend %s', self.backend_name) - return + return route_events - try: - event_name = event['name'] - except TypeError as exc: - raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc - try: - logger.debug('Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)) + for event in events: + try: + event_name = event['name'] + except TypeError as exc: + raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc - processed_event = self.process_event(event) + try: + logger.debug( + 'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name) + ) + + processed_event = self.process_event(event) + except (EventEmissionExit, ValueError): + logger.error( + 'Could not process edx event "%s" for backend %s\'s router', + event_name, + self.backend_name, + exc_info=True + ) + continue - except EventEmissionExit: - logger.error( - 'Could not process edx event "%s" for backend %s\'s router', + logger.debug( + 'Successfully processed edx event "%s" for router with backend %s. Processed event: %s', event_name, self.backend_name, - exc_info=True + processed_event ) - return - - logger.debug( - 'Successfully processed edx event "%s" for router with backend %s. Processed event: %s', - event_name, - self.backend_name, - processed_event - ) - - for router in routers: - host = router.get_allowed_host(event) - - router_url = router.route_url - if not host: - logger.info( - 'Event %s is not allowed to be sent to any host for router %s with backend "%s"', - event_name, router_url, self.backend_name - ) - else: - updated_event = self.overwrite_event_data(processed_event, host, event_name) - host['host_configurations'] = {} - host['host_configurations'].update({'url': router_url}) - host['host_configurations'].update({'auth_scheme': router.auth_scheme}) - - if router.auth_scheme == RouterConfiguration.AUTH_BASIC: - host['host_configurations'].update({'username': router.username}) - host['host_configurations'].update({'password': router.password}) - elif router.auth_scheme == RouterConfiguration.AUTH_BEARER: - host['host_configurations'].update({'auth_key': router.auth_key}) - - if router.backend_name == RouterConfiguration.CALIPER_BACKEND: - host.update({'router_type': 'AUTH_HEADERS'}) - if 'headers' in host: - host['host_configurations'].update({'headers': host['headers']}) - elif router.backend_name == RouterConfiguration.XAPI_BACKEND: - host.update({'router_type': 'XAPI_LRS'}) + + for router in routers: + host = router.get_allowed_host(event) + router_pk = router.pk + + if not host: + logger.info( + 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', + event_name, router_pk, self.backend_name + ) else: - host.update({'router_type': 'INVALID_TYPE'}) + host = self.configure_host(host, router) + updated_event = self.overwrite_event_data(processed_event, host, event_name) + is_business_critical = event_name in business_critical_events + if router_pk not in route_events: + route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),] + else: + route_events[router_pk].append((event_name, updated_event, host, is_business_critical)) + + return route_events + + def bulk_send(self, events): + """ + Send the event to configured routers after processing it. + + Event is processed through the configured processors. A router config + object matching the backend_name and other match params is used to get + the list of hosts to which the event is required to be delivered to. + + Arguments: + events (list[dict]): list of original event dictionaries + """ + event_routes = self.prepare_to_send(events) + + for events_for_route in event_routes.values(): + prepared_events = [] + host = None + for _, updated_event, host, _ in events_for_route: + prepared_events.append(updated_event) + + if prepared_events: # pragma: no cover + dispatch_bulk_events.delay( + prepared_events, + host['router_type'], + host['host_configurations'] + ) + + def send(self, event): + """ + Send the event to configured routers after processing it. + + Event is processed through the configured processors. A router config + object matching the backend_name and other match params is used to get + the list of hosts to which the event is required to be delivered to. + + Arguments: + event (dict): the original event dictionary + """ + event_routes = self.prepare_to_send([event]) - business_critical_events = get_business_critical_events() - if event_name in business_critical_events: + for events_for_route in event_routes.values(): + for event_name, updated_event, host, is_business_critical in events_for_route: + if is_business_critical: dispatch_event_persistent.delay( event_name, updated_event, diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index d7f61504..08455b90 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -80,6 +80,72 @@ def setUp(self): }, } + self.bulk_sample_events = [ + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:00', + 'data': { + 'key': 'value' + }, + 'context': { + 'username': 'testuser' + }, + 'session': '0000' + }, + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:01', + 'data': { + 'key': 'value 1' + }, + 'context': { + 'username': 'testuser1' + }, + 'session': '0001' + }, + { + 'name': str(sentinel.name), + 'event_type': 'edx.test.event', + 'time': '2020-01-01T12:12:12.000000+00:02', + 'data': { + 'key': 'value 2' + }, + 'context': { + 'username': 'testuser2' + }, + 'session': '0002' + } + ] + + self.bulk_transformed_events = [ + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:00', + 'data': { + 'key': 'value' + }, + }, + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:01', + 'data': { + 'key': 'value 1' + }, + }, + { + 'name': str(sentinel.name), + 'transformed': True, + 'event_time': '2020-01-01T12:12:12.000000+00:02', + 'data': { + 'key': 'value 2' + }, + } + ] + self.router = EventsRouter(processors=[], backend_name='test') @patch('event_routing_backends.utils.http_client.requests.post') @@ -168,6 +234,25 @@ def test_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') mocked_post.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + def test_bulk_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): + RouterConfigurationFactory.create( + backend_name='test_backend', + enabled=True, + route_url='http://test3.com', + auth_scheme=RouterConfiguration.AUTH_BEARER, + auth_key='test_key', + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + router = EventsRouter(processors=[], backend_name='test_backend') + TieredCache.dangerous_clear_all_tiers() + router.bulk_send([self.transformed_event]) + + mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') + mocked_post.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') @patch('event_routing_backends.backends.events_router.logger') def test_with_no_available_hosts(self, mocked_logger, mocked_post): @@ -186,8 +271,8 @@ def test_with_no_available_hosts(self, mocked_logger, mocked_post): self.assertIn( call( - 'Event %s is not allowed to be sent to any host for router %s with backend "%s"', - self.transformed_event['name'], router_config.route_url, 'test_backend' + 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', + self.transformed_event['name'], router_config.pk, 'test_backend' ), mocked_logger.info.mock_calls ) @@ -223,6 +308,166 @@ def test_generic_exception(self, backend_name, mocked_logger, mocked_post): else: mocked_logger.exception.assert_not_called() + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_bulk_post(self, mocked_logger, mocked_post): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.request.method = "POST" + mock_response.text = "Fake Server Error" + + mocked_post.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.CALIPER_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_post.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_post(self, mocked_logger, mocked_post): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.request.method = "POST" + mock_response.text = "Fake Server Error" + + mocked_post.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.CALIPER_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router.send(self.transformed_event) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_post.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_bulk_routing(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 500 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statements.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_remote_lrs.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_failed_routing(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 500 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statement.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.send(self.transformed_event) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + self.assertEqual(mocked_remote_lrs.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_duplicate_ids_in_bulk(self, mocked_logger, mocked_remote_lrs): + mock_response = MagicMock() + mock_response.success = False + mock_response.data = "Fake response data" + mock_response.response.code = 409 + mock_response.request.method = "POST" + mock_response.request.content = "Fake request content" + + mocked_remote_lrs.return_value.save_statements.return_value = mock_response + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router.bulk_send([self.transformed_event]) + + self.assertEqual(mocked_logger.exception.call_count, 0) + self.assertEqual(mocked_remote_lrs.call_count, 1) + + @ddt.data( + ( + RouterConfiguration.XAPI_BACKEND, + ), + ( + RouterConfiguration.CALIPER_BACKEND, + ) + ) + @patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', { + 'AUTH_HEADERS': MagicMock(side_effect=EventNotDispatched) + }) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + @ddt.unpack + def test_bulk_generic_exception(self, backend_name, mocked_logger, mocked_post): + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[2] + ) + + router = EventsRouter(processors=[], backend_name=backend_name) + router.bulk_send([self.transformed_event]) + if backend_name == RouterConfiguration.CALIPER_BACKEND: + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + mocked_post.assert_not_called() + else: + mocked_logger.exception.assert_not_called() + def test_with_non_dict_event(self): RouterConfigurationFactory.create( backend_name=RouterConfiguration.XAPI_BACKEND, @@ -406,3 +651,136 @@ def test_unsuccessful_routing_of_event_http(self): client = HttpClient(**host_configurations) with self.assertRaises(EventNotDispatched): client.send(event=self.transformed_event, event_name=self.transformed_event['name']) + + @ddt.data( + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.CALIPER_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test3.com' + ), + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.XAPI_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test3.com' + ), + ) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @ddt.unpack + def test_successful_routing_of_bulk_events( + self, + auth_scheme, + auth_key, + username, + password, + backend_name, + route_url, + mocked_lrs, + mocked_post, + ): + TieredCache.dangerous_clear_all_tiers() + mocked_oauth_client = MagicMock() + mocked_api_key_client = MagicMock() + + MOCKED_MAP = { + 'AUTH_HEADERS': HttpClient, + 'OAUTH2': mocked_oauth_client, + 'API_KEY': mocked_api_key_client, + 'XAPI_LRS': LrsClient, + } + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url=route_url, + auth_scheme=auth_scheme, + auth_key=auth_key, + username=username, + password=password, + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + router = EventsRouter(processors=[], backend_name=backend_name) + + with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): + router.bulk_send(self.bulk_transformed_events) + + overridden_events = self.bulk_transformed_events.copy() + + for event in overridden_events: + event['new_key'] = 'new_value' + + if backend_name == RouterConfiguration.XAPI_BACKEND: + # test LRS Client + mocked_lrs().save_statements.assert_has_calls([ + call(overridden_events), + ]) + else: + # test the HTTP client + if auth_scheme == RouterConfiguration.AUTH_BASIC: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + auth=(username, password) + ), + ]) + elif auth_scheme == RouterConfiguration.AUTH_BEARER: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + 'Authorization': RouterConfiguration.AUTH_BEARER + ' ' + auth_key + } + ), + ]) + else: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + ), + ]) + + # test mocked oauth client + mocked_oauth_client.assert_not_called() diff --git a/event_routing_backends/management/__init__.py b/event_routing_backends/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/__init__.py b/event_routing_backends/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/helpers/__init__.py b/event_routing_backends/management/commands/helpers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_routing_backends/management/commands/helpers/event_log_parser.py b/event_routing_backends/management/commands/helpers/event_log_parser.py new file mode 100644 index 00000000..2cd87341 --- /dev/null +++ b/event_routing_backends/management/commands/helpers/event_log_parser.py @@ -0,0 +1,56 @@ +""" +Support for reading tracking event logs. + +Taken entirely from edx-analytics-pipeline. +""" +import json +import logging +import re +from json.decoder import JSONDecodeError + +log = logging.getLogger(__name__) + +PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$') + + +def parse_json_event(line): + """ + Parse a tracking log input line as JSON to create a dict representation. + + Arguments: + * line: the eventlog text + """ + try: + json_match = PATTERN_JSON.match(line) + parsed = json.loads(json_match.group(1)) + + # The representation of an event that event-routing-backends receives + # from the async sender if significantly different from the one that + # are saved to tracking log files for reasons lost to history. + # This section of code attempts to format the event line to match the + # async version. + + try: + # The async version uses "data" for what the log file calls "event". + # Sometimes "event" is a nested string of JSON that needs to be parsed. + parsed["data"] = json.loads(parsed["event"]) + except (TypeError, JSONDecodeError): + # If it's a TypeError then the "event" was not a string to be parsed, + # so probably already a dict. If it's a JSONDecodeError that means the + # "event" was a string, but not JSON. Either way we just pass the value + # back, since all of those are valid. + parsed["data"] = parsed["event"] + + # The async version of tracking logs seems to use "timestamp" for this key, + # while the log file uses "time". We normalize it here. + if "timestamp" not in parsed and "time" in parsed: + parsed["timestamp"] = parsed["time"] + + return parsed + except (AttributeError, JSONDecodeError) as e: + log.error("EXCEPTION!!!") + log.error(type(e)) + log.error(e) + log.error(line) + + return None diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py new file mode 100644 index 00000000..5a640ea4 --- /dev/null +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -0,0 +1,165 @@ +""" +Class to handle batching and sending bulk transformed statements. +""" +import datetime +import json +import os +from io import BytesIO +from time import sleep + +from event_routing_backends.backends.events_router import EventsRouter +from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event +from event_routing_backends.models import RouterConfiguration +from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor +from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor + + +class QueuedSender: + """ + Handles queuing and sending events to the destination. + """ + def __init__( + self, + destination, + destination_container, + destination_prefix, + transformer_type, + max_queue_size=10000, + sleep_between_batches_secs=1.0, + dry_run=False + ): + self.destination = destination + self.destination_container = destination_container + self.destination_prefix = destination_prefix + self.transformer_type = transformer_type + self.event_queue = [] + self.max_queue_size = max_queue_size + self.sleep_between_batches = sleep_between_batches_secs + self.dry_run = dry_run + + # Bookkeeping + self.queued_lines = 0 + self.skipped_lines = 0 + self.unparsable_lines = 0 + self.batches_sent = 0 + + if self.transformer_type == "xapi": + self.router = EventsRouter( + backend_name=RouterConfiguration.XAPI_BACKEND, + processors=[XApiProcessor()] + ) + else: + self.router = EventsRouter( + backend_name=RouterConfiguration.CALIPER_BACKEND, + processors=[CaliperProcessor()] + ) + + def is_known_event(self, event): + """ + Check whether any processor cares about this event. + """ + if "name" in event: + for processor in self.router.processors: + if event["name"] in processor.registry.mapping: + return True + return False + + def transform_and_queue(self, line): + """ + Queue the JSON representation of this log line, if valid and known to any processor. + """ + event = parse_json_event(line) + + if not event: + self.unparsable_lines += 1 + return + + if not self.is_known_event(event): + self.skipped_lines += 1 + return + + self.queue(event) + self.queued_lines += 1 + + def queue(self, event): + """ + Add an event to the queue, try to send if we've reached our batch size. + """ + self.event_queue.append(event) + if len(self.event_queue) == self.max_queue_size: + if self.dry_run: + print("Dry run, skipping, but still clearing the queue.") + else: + print(f"Max queue size of {self.max_queue_size} reached, sending.") + if self.destination == "LRS": + self.send() + else: + self.store() + + self.batches_sent += 1 + self.event_queue.clear() + sleep(self.sleep_between_batches) + + def send(self): + """ + Send to the LRS if we're configured for that, otherwise a no-op. + """ + if self.destination == "LRS": + print(f"Sending {len(self.event_queue)} events to LRS...") + print(self.router) + self.router.bulk_send(self.event_queue) + else: + print("Skipping send, we're storing with libcloud instead of an LRS.") + + def store(self): + """ + Store to a libcloud destination if we're configured for that. + """ + if self.destination == "LRS": + print("Store is being called on an LRS destination, skipping.") + return + + display_path = os.path.join(self.destination_container, self.destination_prefix.lstrip("/")) + print(f"Storing {len(self.event_queue)} events to libcloud destination {display_path}") + + container = self.destination.get_container(self.destination_container) + + datestr = datetime.datetime.now().strftime('%y-%m-%d_%H-%M-%S') + object_name = f"{self.destination_prefix}/{datestr}_{self.transformer_type}.log" + print(f"Writing to {self.destination_container}/{object_name}") + + out = BytesIO() + for event in self.event_queue: + out.write(str.encode(json.dumps(event))) + out.write(str.encode("\n")) + out.seek(0) + + self.destination.upload_object_via_stream( + out, + container, + object_name + ) + + def finalize(self): + """ + Send a last batch of events via the LRS, or store a complete set of events to a libcloud destination. + """ + print(f"Finalizing {len(self.event_queue)} events to {self.destination}") + if not self.queued_lines: + print("Nothing in the queue to store!") + elif self.dry_run: + print("Dry run, skipping final storage.") + else: + # One final send, in case there are events left in the queue + if self.destination is None or self.destination == "LRS": + print("Sending to LRS!") + self.send() + else: + print("Storing via Libcloud!") + self.store() + self.batches_sent += 1 + + print(f"Queued {self.queued_lines} log lines, " + f"could not parse {self.unparsable_lines} log lines, " + f"skipped {self.skipped_lines} log lines, " + f"sent {self.batches_sent} batches.") diff --git a/event_routing_backends/management/commands/tests/fixtures/tracking.log b/event_routing_backends/management/commands/tests/fixtures/tracking.log new file mode 100644 index 00000000..9954ea42 --- /dev/null +++ b/event_routing_backends/management/commands/tests/fixtures/tracking.log @@ -0,0 +1,12 @@ +2023-05-23 13:53:13,461 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "time": "2023-05-23T13:53:13.460372+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 13:53:13,477 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.course.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "course_version": "6452821a39fb48e67c1a860c", "percent_grade": 0.03, "letter_grade": "", "course_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted", "grading_policy_hash": "2HDb6cWz6xgUQ8b9YhsgN5sukP0="}, "time": "2023-05-23T13:53:13.475692+00:00", "event_type": "edx.grades.course.grade_calculated", "event_source": "server", "page": null} +2023-05-23 13:53:13,495 INFO 20 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.course.grade.now_failed", "context": "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "event_transaction_id": "45acd5c9-af65-45bd-82fe-313fa5bf9f79", "event_transaction_type": "edx.grades.problem.submitted"}, "time": "2023-05-23T13:53:13.486905+00:00", "event_type": "edx.course.grade.now_failed", "event_source": "server", "page": null} +2023-05-23 14:12:17,290 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": "{\"GET\": {}, \"POST\": {\"input_932e6f2ce8274072a355a94560216d1a_2_1\": [\"choice_2\"]}}", "time": "2023-05-23T14:12:17.286772+00:00", "event_type": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "event_source": "server", "page": null} +2023-05-23 14:12:17,308 INFO 7 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_check", "context": {"user_id": 6, "path": "/event", "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": "input_932e6f2ce8274072a355a94560216d1a_2_1=choice_2", "time": "2023-05-23T14:12:17.299491+00:00", "event_type": "problem_check", "event_source": "browser", "page": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view"} +2023-05-23 14:12:17,388 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "edx.grades.problem.submitted", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": "", "module": {"display_name": "Perchance to Dream", "usage_key": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a"}}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "problem_id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "weighted_earned": 1, "weighted_possible": 1}, "time": "2023-05-23T14:12:17.388051+00:00", "event_type": "edx.grades.problem.submitted", "event_source": "server", "page": null} +2023-05-23 14:12:17,402 INFO 27 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_check", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "course_user_tags": {}, "user_id": 6, "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a/handler/xmodule_handler/problem_check", "org_id": "edX", "enterprise_uuid": "", "module": {"display_name": "Perchance to Dream", "usage_key": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a"}, "asides": {}}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": {"state": {"seed": 1, "student_answers": {"932e6f2ce8274072a355a94560216d1a_2_1": "choice_2"}, "has_saved_answers": false, "correct_map": {"932e6f2ce8274072a355a94560216d1a_2_1": {"correctness": "correct", "npoints": null, "msg": "", "hint": "", "hintmode": null, "queuestate": null, "answervariable": null}}, "input_state": {"932e6f2ce8274072a355a94560216d1a_2_1": {}}, "done": true}, "problem_id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@932e6f2ce8274072a355a94560216d1a", "answers": {"932e6f2ce8274072a355a94560216d1a_2_1": "choice_2"}, "grade": 1, "max_grade": 1, "correct_map": {"932e6f2ce8274072a355a94560216d1a_2_1": {"correctness": "correct", "npoints": null, "msg": "", "hint": "", "hintmode": null, "queuestate": null, "answervariable": null}}, "success": "correct", "attempts": 3, "submission": {"932e6f2ce8274072a355a94560216d1a_2_1": {"question": "", "answer": "There is an implication that the strangeness to follow can be considered like a dream.", "response_type": "multiplechoiceresponse", "input_type": "choicegroup", "correct": true, "variant": "", "group_label": ""}}}, "time": "2023-05-23T14:12:17.398107+00:00", "event_type": "problem_check", "event_source": "server", "page": "x_module"} +2023-05-23 14:12:17,457 INFO 7 [tracking] [user 6] [ip 172.18.0.1] logger.py:41 - {"name": "problem_graded", "context": {"user_id": 6, "path": "/event", "course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "bmtcril", "session": "79e015634b37255d8cb5fcdef0a73ad4", "ip": "172.18.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/113.0", "host": "local.overhang.io", "referer": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view", "accept_language": "en-US,en;q=0.5", "event": ["input_932e6f2ce8274072a355a94560216d1a_2_1=choice_2", "\n\n\n\n

\n Perchance to Dream\n

\n\n
\n\n
\n
\n

The paragraph contains references to sleepiness and an unexpected event. Why?

\n
\n
\n \n
\n \n
\n \n
\n \n
\n \n
\n \n
\n \n
\n
\n \n\n\n correct\n\n\n
\n
\n
\n
\n \n\n
\n \n \n \n
\n
\n \n\n
\n Some problems have options such as save, reset, hints, or show answer. These options follow the Submit button.\n
\n
\n
\n \n\n\n
\n \n \n \n
\n \n
\n
\n\n \n\n\n
\n \n Correct (1/1 point)\n \n
\n \n
\n
\n\n \n\n\n
\n \n None\n \n
\n \n
\n
\n\n \n \n\n\n
\n \n Answers are displayed within the problem\n \n
\n \n
\n
\n\n
\n\n\n"], "time": "2023-05-23T14:12:17.457001+00:00", "event_type": "problem_graded", "event_source": "browser", "page": "http://local.overhang.io/xblock/block-v1:edX+DemoX+Demo_Course+type@vertical+block@134df56c516a4a0dbb24dd5facef746e?show_title=0&show_bookmark_button=0&recheck_access=1&view=student_view"} +2023-05-23 14:12:19,621 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "time": "2023-05-23T14:12:19.620891+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 14:12:19,621 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.subsection.grade_calculated2", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "block_id": "block-v1:edX+DemoX+Demo_Course+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5", "course_version": "6452821a39fb48e67c1a860c", "weighted_total_earned": 3.0, "weighted_total_possible": 3.0, "weighted_graded_earned": 3.0, "weighted_graded_possible": 3.0, "first_attempted": "2023-05-12 20:55:53.096285+00:00", "subtree_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted", "visible_blocks_hash": "5RUTrtbgdsnoSKsFavs24Om8Yp0="}, "timestamp": "2023-05-23T14:12:19.620891+00:00", "event_type": "edx.grades.subsection.grade_calculated", "event_source": "server", "page": null} +2023-05-23 14:12:19,631 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.grades.course.grade_calculated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "course_version": "6452821a39fb48e67c1a860c", "percent_grade": 0.03, "letter_grade": "", "course_edited_timestamp": "2023-05-03 15:47:38.629000+00:00", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "eoh no this line is broken +2023-05-23 14:12:19,641 INFO 42 [tracking] [user None] [ip None] logger.py:41 - {"name": "edx.course.grade.now_failed", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course", "org_id": "edX", "enterprise_uuid": ""}, "username": "", "session": "", "ip": "", "agent": "", "host": "", "referer": "", "accept_language": "", "event": {"user_id": "6", "course_id": "course-v1:edX+DemoX+Demo_Course", "event_transaction_id": "aee73a98-8bf3-42ca-99a5-ac008eb62c1d", "event_transaction_type": "edx.grades.problem.submitted"}, "time": "2023-05-23T14:12:19.635349+00:00", "event_type": "edx.course.grade.now_failed", "event_source": "server", "page": null} diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py new file mode 100644 index 00000000..0cdb0233 --- /dev/null +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -0,0 +1,383 @@ +""" +Tests for the transform_tracking_logs management command. +""" +import json +from unittest.mock import MagicMock, patch + +import pytest +from django.core.management import call_command +from libcloud.storage.types import ContainerDoesNotExistError + +from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender +from event_routing_backends.management.commands.transform_tracking_logs import ( + get_dest_config_from_options, + get_libcloud_drivers, + get_source_config_from_options, + validate_source_and_files, +) + +LOCAL_CONFIG = json.dumps({"key": "/openedx/", "container": "data", "prefix": ""}) +REMOTE_CONFIG = json.dumps({ + "key": "api key", + "secret": "api secret key", + "prefix": "/xapi_statements/", + "container": "test_bucket", + "secure": False, + "host": "127.0.0.1", + "port": 9191 +}) + + +@pytest.fixture +def mock_common_calls(): + """ + Mock out calls that we test elsewhere and aren't relevant to the command tests. + """ + command_path = "event_routing_backends.management.commands.transform_tracking_logs" + helper_path = "event_routing_backends.management.commands.helpers" + with patch(command_path+".Provider") as mock_libcloud_provider: + with patch(command_path+".get_driver") as mock_libcloud_get_driver: + with patch(helper_path + ".queued_sender.EventsRouter") as mock_eventsrouter: + yield mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter + + +def command_options(): + """ + A fixture of different command options and their expected outputs. + """ + options = [ + # Local file to LRS, small batch size to test batching + { + "transformer_type": "xapi", + "source_provider": "LOCAL", + "source_config": LOCAL_CONFIG, + "batch_size": 1, + "sleep_between_batches_secs": 0, + "expected_results": { + "expected_batches_sent": 2, + "log_lines": [ + "Looking for log files in data/*", + "Max queue size of 1 reached, sending.", + "Sending 1 events to LRS...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 3 batches.", + "Sending to LRS!" + ] + }, + }, + # Remote file to LRS dry run no batch size + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "sleep_between_batches_secs": 0, + "dry_run": True, + "expected_results": { + # Dry run, nothing should be sent + "expected_batches_sent": 0, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LRS", + "Dry run, skipping final storage.", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 0 batches.", + ] + }, + }, + # Remote file to LRS, default batch size + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "sleep_between_batches_secs": 0, + "expected_results": { + # No batch size given, default is 10k so only one batch sent + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LRS", + "Sending to LRS!", + "Sending 2 events to LRS...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.", + ] + }, + }, + # Local file to remote file + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "destination_provider": "MINIO", + "destination_config": REMOTE_CONFIG, + "batch_size": 2, + "sleep_between_batches_secs": 0, + "expected_results": { + # Remote files only get written once + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 0 events to", + "Storing via Libcloud!", + "Max queue size of 2 reached, sending.", + "Storing 2 events to libcloud destination test_bucket/xapi_statements/", + "Storing 0 events to libcloud destination test_bucket/xapi_statements/", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 2 batches.", + ] + }, + }, + # Remote file dry run + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "destination_provider": "MINIO", + "destination_config": REMOTE_CONFIG, + "batch_size": 1, + "dry_run": True, + "sleep_between_batches_secs": 0, + "expected_results": { + # Dry run, nothing should be sent + "expected_batches_sent": 0, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 0 events to", + "Dry run, skipping, but still clearing the queue.", + "Dry run, skipping final storage.", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 0 batches.", + ] + }, + }, + ] + + for option in options: + yield option + + +def get_raw_log_stream(_): + """ + Return raw event json parsed from current fixtures + """ + import os + TEST_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) + + tracking_log_path = '{test_dir}/fixtures/tracking.log'.format(test_dir=TEST_DIR_PATH) + + with open(tracking_log_path, "rb", buffering=10) as current: + yield current.read() + + +@pytest.mark.parametrize("command_opts", command_options()) +def test_transform_command(command_opts, mock_common_calls, caplog, capsys): + """ + Test the command and QueuedSender with a variety of options. + """ + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + expected_results = command_opts.pop("expected_results") + + mm = MagicMock() + + mock_log_object = MagicMock() + mock_log_object.__str__.return_value = "tracking.log" + mock_log_object.name = "tracking.log" + mock_log_object.size = "1024" + + # Fake finding one log file in each container, it will be loaded and parsed twice + mm.return_value.iterate_container_objects.return_value = [mock_log_object] + mm.return_value.download_object_as_stream = get_raw_log_stream + mock_libcloud_get_driver.return_value = mm + + # Fake a router mapping so some events in the log are actually processed + mm2 = MagicMock() + mm2.registry.mapping = {"problem_check": 1} + mock_eventsrouter.return_value.processors = [mm2] + + call_command( + 'transform_tracking_logs', + **command_opts + ) + + # Router should only be set up once + assert mock_eventsrouter.call_count == 1 + + captured = capsys.readouterr() + print(captured.out) + + # Log statements we always expect with this configuration + assert "Streaming file tracking.log..." in captured.out + + # There are intentionally broken log statements in the test file that cause these + # lines to be emitted. + assert "EXCEPTION!!!" in caplog.text + assert "'NoneType' object has no attribute 'group'" in caplog.text + assert "Expecting ',' delimiter: line 1 column 63 (char 62)" in caplog.text + + # Check the specific expected log lines for this set of options + for line in expected_results["log_lines"]: + assert line in caplog.text or line in captured.out + + +def test_queued_sender_store_on_lrs(mock_common_calls, capsys): + """ + Test that we don't attempt to store on an LRS backend. + """ + qs = QueuedSender("LRS", "fake_container", None, "xapi") + qs.store() + + captured = capsys.readouterr() + print(captured.out) + assert "Store is being called on an LRS destination, skipping." in captured.out + + +def test_queued_sender_broken_event(mock_common_calls, capsys): + """ + Test that we don't attempt to store on an LRS backend. + """ + qs = QueuedSender("LRS", "fake_container", None, "xapi") + assert not qs.is_known_event({"this has no name key and will fail": 1}) + + +def test_queued_sender_store_empty_queue(mock_common_calls, capsys): + """ + Test that we don't attempt to store() when there's nothing in the queue. + """ + qs = QueuedSender("NOT LRS", "fake_container", None, "xapi") + qs.finalize() + + captured = capsys.readouterr() + print(captured.out) + assert "Nothing in the queue to store!" in captured.out + + +def test_queued_sender_send_on_libcloud(mock_common_calls, capsys): + """ + Test that we don't attempt to send() when using a libcloud backend. + """ + qs = QueuedSender("NOT LRS", "fake_container", None, "caliper") + qs.send() + + captured = capsys.readouterr() + print(captured.out) + assert "Skipping send, we're storing with libcloud instead of an LRS." in captured.out + + +def test_queued_sender_container_does_not_exist(mock_common_calls, caplog): + """ + Test that we raise an exception if a container doesn't exist. + """ + mock_destination = MagicMock() + mock_destination.get_container.side_effect = ContainerDoesNotExistError( + "Container 'fake_container' doesn't exist.", None, "fake") + with pytest.raises(ContainerDoesNotExistError): + qs = QueuedSender(mock_destination, "fake_container", "fake_prefix", "xapi") + qs.queued_lines = ["fake"] + qs.store() + + +def test_invalid_libcloud_source_driver(capsys, mock_common_calls): + """ + Check error cases when non-existent libcloud drivers are passed in. + """ + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + mock_libcloud_get_driver.side_effect = [AttributeError(), MagicMock()] + + with pytest.raises(AttributeError): + get_libcloud_drivers("I should fail", {}, "I should never get called", {}) + + captured = capsys.readouterr() + print(captured.out) + assert "is not a valid source Libcloud provider." in captured.out + + +def test_invalid_libcloud_dest_driver(capsys, mock_common_calls): + mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls + + mock_libcloud_get_driver.side_effect = [MagicMock(), AttributeError()] + with pytest.raises(AttributeError): + get_libcloud_drivers("I should succeed", {}, "I should fail", {}) + + captured = capsys.readouterr() + print(captured.out) + assert "is not a valid destination Libcloud provider." in captured.out + + +def test_no_files_in_source_dir(caplog): + """ + Check error case when there are no source files found in the libcloud source. + """ + fake_driver = MagicMock() + fake_driver.iterate_container_objects.return_value = [] + with pytest.raises(FileNotFoundError): + validate_source_and_files(fake_driver, "container name", "prefix") + + +def test_required_source_libcloud_keys(capsys): + """ + Check that we raise an error if the container and prefix aren't given. + """ + with pytest.raises(KeyError): + get_source_config_from_options("{}") + + captured = capsys.readouterr() + print(captured.out) + assert "The following keys must be defined in source_config: 'prefix', 'container'" in captured.out + + +def test_required_dest_libcloud_keys(capsys): + """ + Check that we raise an error if the container and prefix aren't given in a non-LRS destination. + """ + with pytest.raises(KeyError): + get_dest_config_from_options(None, "{}") + + captured = capsys.readouterr() + print(captured.out) + assert "If not using the 'LRS' destination, the following keys must be defined in destination_config: " \ + "'prefix', 'container'" in captured.out + + +def test_get_source_config(): + """ + Check that our special keys are popped off the options when retrieving the source config. + """ + options = { + "key": "fake test key", + "container": "fake container", + "prefix": "fake prefix" + } + + config, container, prefix = get_source_config_from_options(json.dumps(options)) + + assert len(config) == 1 + assert config["key"] == options["key"] + assert container == "fake container" + assert prefix == "fake prefix" + + +def test_get_dest_config(): + """ + Check that our special keys are popped off the options when retrieving the non-LRS destination config. + """ + options = { + "key": "fake test key", + "container": "fake container", + "prefix": "fake prefix" + } + + config, container, prefix = get_dest_config_from_options("fake provider", json.dumps(options)) + + assert len(config) == 1 + assert config["key"] == options["key"] + assert container == "fake container" + assert prefix == "fake prefix" + + +def test_get_dest_config_lrs(): + """ + Check that an LRS destination config returns appropriate values. + """ + options = {} + + config, container, prefix = get_dest_config_from_options("LRS", options) + assert config is None + assert container is None + assert prefix is None diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py new file mode 100644 index 00000000..c5dfffa2 --- /dev/null +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -0,0 +1,254 @@ +""" +Management command for transforming tracking log files. +""" +import json +import os +from io import BytesIO +from textwrap import dedent + +from django.core.management.base import BaseCommand +from libcloud.storage.providers import get_driver +from libcloud.storage.types import Provider + +from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender + + +def transform_tracking_logs( + source, + source_container, + source_prefix, + sender +): + """ + Transform one or more tracking log files from the given source to the given destination. + """ + # Containers are effectively directories, this recursively tries to find files + # matching the given prefix in the given source. + container = source.get_container(container_name=source_container) + + display_path = os.path.join(source_container, source_prefix.lstrip("/")) + print(f"Looking for log files in {display_path}*") + + for file in source.iterate_container_objects(container, source_prefix): + # Download the file as a stream of characters to save on memory + print(f"Streaming file {file}...") + line = "" + for chunk in source.download_object_as_stream(file): + chunk = chunk.decode('utf-8') + + # Loop through this chunk, if we find a newline it's time to process + # otherwise just keep appending. + for char in chunk: + if char == "\n" and line: + sender.transform_and_queue(line) + line = "" + else: + line += char + + # Sometimes the file doesn't end with a newline, we try to use + # any remaining bytes as a final line. + if line: + sender.transform_and_queue(line) # pragma: no cover + + # Give the queue a chance to send any remaining events left in the queue + sender.finalize() + + +def get_source_config_from_options(source_config_options): + """ + Prepare our source configuration from the configuration JSON. + """ + source_config = json.loads(source_config_options) + try: + source_prefix = source_config.pop("prefix") + source_container = source_config.pop("container") + return source_config, source_container, source_prefix + except KeyError as e: + print("The following keys must be defined in source_config: 'prefix', 'container'") + raise e + + +def get_dest_config_from_options(destination_provider, dest_config_options): + """ + Prepare our destination configuration. + + All None's if these are being sent to an LRS, or use values from the destination_configuration JSON option. + """ + if destination_provider != "LRS": + dest_config = json.loads(dest_config_options) + try: + dest_container = dest_config.pop("container") + dest_prefix = dest_config.pop("prefix") + except KeyError as e: + print("If not using the 'LRS' destination, the following keys must be defined in " + "destination_config: 'prefix', 'container'") + raise e + else: + dest_config = dest_container = dest_prefix = None + + return dest_config, dest_container, dest_prefix + + +def validate_source_and_files(driver, container_name, prefix): + """ + Validate that the given libcloud source exists and has files in it to read. + """ + container = driver.get_container(container_name) + objects = list(driver.iterate_container_objects(container, prefix)) + if not objects: + raise FileNotFoundError(f"No files found in {container_name}/{prefix}*") + print(f"Found {len(objects)} files in {container_name}/{prefix}*") + return [f"{obj.name} - {obj.size} bytes" for obj in objects] + + +def validate_destination(driver, container_name, prefix, source_objects): + """ + Validate that the given libcloud destination exists and can be written to. + """ + container = driver.get_container(container_name) + full_path = f"{prefix}/manifest.log" + file_list = "\n".join(source_objects) + driver.upload_object_via_stream( + iterator=BytesIO(file_list.encode()), + container=container, + object_name=full_path + ) + print(f"Wrote source file list to '{container_name}/{full_path}'") + + +def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config): + """ + Attempt to configure the libcloud drivers for source and destination. + """ + try: + source_provider = getattr(Provider, source_provider) + source_cls = get_driver(source_provider) + source_driver = source_cls(**source_config) + except AttributeError: + print(f"{source_provider} is not a valid source Libcloud provider.") + raise + + # There is no driver for LRS + destination_driver = "LRS" + if destination_provider != "LRS": + try: + destination_provider = getattr(Provider, destination_provider) + destination_cls = get_driver(destination_provider) + destination_driver = destination_cls(**destination_config) + except AttributeError: + print(f"{destination_provider} is not a valid destination Libcloud provider.") + raise + + return source_driver, destination_driver + + +class Command(BaseCommand): + """ + Transform tracking logs to an LRS or other output destination. + """ + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + parser.add_argument( + '--source_provider', + type=str, + help="An Apache Libcloud 'provider constant' from: " + "https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . " + "Ex: LOCAL for local storage or S3 for AWS S3.", + required=True, + ) + parser.add_argument( + '--source_config', + type=str, + help="A JSON dictionary of configuration for the source provider. Leave" + "blank the destination_provider is 'LRS'. See the Libcloud docs for the necessary options" + "for your destination. If your destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them " + "to the config here under the key 'container'. If your source needs a prefix (ex: directory path, " + "or wildcard beginning of a filename), add it here under the key 'prefix'. If no prefix is given, " + "all files in the given location will be attempted!", + required=True, + ) + parser.add_argument( + '--destination_provider', + type=str, + default="LRS", + help="Either 'LRS' to use the default configured xAPI and/or Caliper servers" + "or an Apache Libcloud 'provider constant' from this list: " + "https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . " + "Ex: LOCAL for local storage or S3 for AWS S3.", + ) + parser.add_argument( + '--destination_config', + type=str, + help="A JSON dictionary of configuration for the destination provider. Not needed for the 'LRS' " + "destination_provider. See the Libcloud docs for the necessary options for your destination. If your " + "destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them to the config here under the " + "key 'container'. If your destination needs a prefix (ex: directory path), add it here under the key " + "'prefix'. If no prefix is given, the output file(s) will be written to the base path.", + ) + parser.add_argument( + '--transformer_type', + choices=["xapi", "caliper"], + required=True, + help="The type of transformation to do, only one can be done at a time.", + ) + parser.add_argument( + '--batch_size', + type=int, + default=10000, + help="How many events to send at a time. For the LRS destination this will be one POST per this many " + "events, for all other destinations a new file will be created containing up to this many events. " + "This helps reduce memory usage in the script and increases helps with LRS performance.", + ) + parser.add_argument( + '--sleep_between_batches_secs', + type=float, + default=10.0, + help="Fractional seconds to sleep between sending batches to a destination, used to reduce load on the LMS " + "and LRSs when performing large operations.", + ) + parser.add_argument( + '--dry_run', + action="store_true", + help="Attempt to transform all lines from all files, but do not send to the destination.", + ) + + def handle(self, *args, **options): + """ + Configure the command and start the transform process. + """ + source_config, source_container, source_prefix = get_source_config_from_options(options["source_config"]) + dest_config, dest_container, dest_prefix = get_dest_config_from_options( + options["destination_provider"], + options["destination_config"] + ) + + source_driver, dest_driver = get_libcloud_drivers( + options["source_provider"], + source_config, + options["destination_provider"], + dest_config + ) + + source_file_list = validate_source_and_files(source_driver, source_container, source_prefix) + if dest_driver != "LRS": + validate_destination(dest_driver, dest_container, dest_prefix, source_file_list) + else: + print(f"Source files found: {source_file_list}") + + sender = QueuedSender( + dest_driver, + dest_container, + dest_prefix, + options["transformer_type"], + max_queue_size=options["batch_size"], + sleep_between_batches_secs=options["sleep_between_batches_secs"], + dry_run=options["dry_run"] + ) + + transform_tracking_logs( + source_driver, + source_container, + source_prefix, + sender + ) diff --git a/event_routing_backends/processors/caliper/tests/test_caliper.py b/event_routing_backends/processors/caliper/tests/test_caliper.py index 17fcacf8..d144130f 100644 --- a/event_routing_backends/processors/caliper/tests/test_caliper.py +++ b/event_routing_backends/processors/caliper/tests/test_caliper.py @@ -58,7 +58,7 @@ def test_send_method_with_unknown_exception(self, mocked_logger, _): @patch( 'event_routing_backends.processors.caliper.transformer_processor.CaliperTransformersRegistry.get_transformer' ) - @patch('event_routing_backends.processors.caliper.transformer_processor.caliper_logger') + @patch('event_routing_backends.processors.caliper.transformer_processor.logger') def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_transformer): transformed_event = { 'transformed_key': 'transformed_value' @@ -76,7 +76,7 @@ def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_trans json.dumps(transformed_event) ) ), - mocked_logger.info.mock_calls + mocked_logger.debug.mock_calls ) @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') diff --git a/event_routing_backends/processors/caliper/transformer_processor.py b/event_routing_backends/processors/caliper/transformer_processor.py index 3bdc071a..9c48c078 100644 --- a/event_routing_backends/processors/caliper/transformer_processor.py +++ b/event_routing_backends/processors/caliper/transformer_processor.py @@ -10,6 +10,7 @@ from event_routing_backends.processors.caliper.registry import CaliperTransformersRegistry from event_routing_backends.processors.mixins.base_transformer_processor import BaseTransformerProcessorMixin +logger = getLogger(__name__) caliper_logger = getLogger('caliper_tracking') @@ -44,7 +45,11 @@ def transform_event(self, event): transformed_event = super().transform_event(event) if transformed_event: - caliper_logger.info('Caliper version of edx event "{}" is: {}'.format(event["name"], - json.dumps(transformed_event))) + json_event = json.dumps(transformed_event) + caliper_logger.info(json_event) + logger.debug('Caliper version of edx event "{}" is: {}'.format( + event["name"], + json_event + )) return transformed_event diff --git a/event_routing_backends/processors/mixins/base_transformer.py b/event_routing_backends/processors/mixins/base_transformer.py index b72e005e..78cf8398 100644 --- a/event_routing_backends/processors/mixins/base_transformer.py +++ b/event_routing_backends/processors/mixins/base_transformer.py @@ -126,9 +126,11 @@ def extract_username_or_userid(self): Returns: str """ - username_or_id = self.get_data('context.username') or self.get_data('context.user_id') + username_or_id = self.get_data('username') or self.get_data('user_id') if not username_or_id: username_or_id = self.get_data('data.username') or self.get_data('data.user_id') + if not username_or_id: + username_or_id = self.get_data('context.username') or self.get_data('context.user_id') return username_or_id def extract_sessionid(self): @@ -138,7 +140,7 @@ def extract_sessionid(self): Returns: str """ - return self.get_data('context.session') or self.get_data('data.session') + return self.get_data('session') or self.get_data('context.session') or self.get_data('data.session') def get_data(self, key, required=False): """ @@ -210,6 +212,7 @@ def get_object_iri(self, object_type, object_id): """ if object_id is None or object_type is None: return None + return '{root_url}/{object_type}/{object_id}'.format( root_url=settings.LMS_ROOT_URL, object_type=object_type, diff --git a/event_routing_backends/processors/xapi/event_transformers/navigation_events.py b/event_routing_backends/processors/xapi/event_transformers/navigation_events.py index 6c1ad68f..8402f1dc 100644 --- a/event_routing_backends/processors/xapi/event_transformers/navigation_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/navigation_events.py @@ -58,7 +58,7 @@ def get_object(self): `Activity` """ return Activity( - id=self.get_data('data.target_url'), + id=self.get_data('data.target_url', True), definition=ActivityDefinition( type=constants.XAPI_ACTIVITY_LINK ), diff --git a/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py b/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py index e390d6e7..6fa7cd88 100644 --- a/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/problem_interaction_events.py @@ -73,17 +73,16 @@ class BaseProblemsTransformer(XApiTransformer, XApiVerbTransformerMixin): def get_object(self): """ Get object for xAPI transformed event. - Returns: `Activity` """ object_id = None data = self.get_data('data') if data and isinstance(data, dict): - object_id = data.get('problem_id', data.get('module_id', None)) + object_id = self.get_data('data.problem_id') or self.get_data('data.module_id', True) event_name = self.get_data('name', True) - # TODO: Add definition[name] of problem once it is added in the event. + return Activity( id=object_id, definition=ActivityDefinition( @@ -133,6 +132,17 @@ class ProblemSubmittedTransformer(BaseProblemsTransformer): """ additional_fields = ('result', ) + def get_object(self): + """ + Get object for xAPI transformed event. + + Returns: + `Activity` + """ + xapi_object = super().get_object() + xapi_object.id = self.get_object_iri('xblock', xapi_object.id) + return xapi_object + def get_result(self): """ Get result for xAPI transformed event. @@ -170,9 +180,11 @@ def get_object(self): # If the event was generated from browser, there is no `problem_id` # or `module_id` field. Therefore we get block id from the referrer. - if self.get_data('context.event_source') == 'browser': + event_source = self.get_data('context.event_source') or self.get_data('event_source') + referer = self.get_data('referer') or self.get_data('context.referer', True) + if event_source == 'browser': block_id = get_problem_block_id( - self.get_data('context.referer', True), + referer, self.get_data('data'), self.get_data('context.course_id') ) @@ -235,7 +247,8 @@ def get_result(self): Result """ # Do not transform result if the event is generated from browser - if self.get_data('context.event_source') == 'browser': + source = self.get_data('event_source') or self.get_data('context.event_source') + if source == 'browser': return None event_data = self.get_data('data') diff --git a/event_routing_backends/processors/xapi/event_transformers/video_events.py b/event_routing_backends/processors/xapi/event_transformers/video_events.py index 2fbd77c7..412b3310 100644 --- a/event_routing_backends/processors/xapi/event_transformers/video_events.py +++ b/event_routing_backends/processors/xapi/event_transformers/video_events.py @@ -141,9 +141,7 @@ def get_object(self): `Activity` """ course_id = self.get_data('context.course_id', True) - video_id = self.get_data('data.id', True) - object_id = make_video_block_id(course_id=course_id, video_id=video_id) return Activity( @@ -200,9 +198,10 @@ def get_result(self): Returns: `Result` """ + current_time = self.get_data('data.current_time') or self.get_data('data.currentTime') return Result( extensions=Extensions({ - constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(self.get_data('data.currentTime')) + constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(current_time) }) ) @@ -237,11 +236,11 @@ def get_result(self): 'show_transcript', ] ) + current_time = self.get_data('data.current_time') or self.get_data('data.currentTime') + return Result( extensions=Extensions({ - constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float( - self.get_data('data.currentTime') or self.get_data('data.current_time') - ), + constants.XAPI_RESULT_VIDEO_TIME: convert_seconds_to_float(current_time), constants.XAPI_RESULT_VIDEO_CC_ENABLED: cc_enabled }) ) diff --git a/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json b/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json index 9a154e1b..32f81365 100644 --- a/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json +++ b/event_routing_backends/processors/xapi/tests/fixtures/expected/edx.grades.problem.submitted.json @@ -28,7 +28,7 @@ "definition": { "type": "http://adlnet.gov/expapi/activities/question" }, - "id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@3fc5461f86764ad7bdbdf6cbdde61e66", + "id": "http://localhost:18000/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@3fc5461f86764ad7bdbdf6cbdde61e66", "objectType": "Activity" }, "result": { diff --git a/event_routing_backends/processors/xapi/transformer.py b/event_routing_backends/processors/xapi/transformer.py index 482fab15..74f2ff0e 100644 --- a/event_routing_backends/processors/xapi/transformer.py +++ b/event_routing_backends/processors/xapi/transformer.py @@ -162,7 +162,8 @@ def verb(self): """ event_name = self.get_data('name', True) - if self.get_data('context.event_source') == 'browser' and event_name == 'problem_check': + event_source = self.get_data('event_source') or self.get_data('context.event_source') + if event_source == 'browser' and event_name == 'problem_check': verb = self.verb_map['problem_check_browser'] else: verb = self.verb_map[event_name] diff --git a/event_routing_backends/processors/xapi/transformer_processor.py b/event_routing_backends/processors/xapi/transformer_processor.py index 7aab55ef..b4ac63d0 100644 --- a/event_routing_backends/processors/xapi/transformer_processor.py +++ b/event_routing_backends/processors/xapi/transformer_processor.py @@ -10,6 +10,7 @@ from event_routing_backends.processors.xapi import XAPI_EVENTS_ENABLED from event_routing_backends.processors.xapi.registry import XApiTransformersRegistry +logger = getLogger(__name__) xapi_logger = getLogger('xapi_tracking') @@ -46,7 +47,7 @@ def transform_event(self, event): if transformed_event: event_json = transformed_event.to_json() xapi_logger.info(event_json) - xapi_logger.info('xAPI statement of edx event "{}" is: {}'.format(event["name"], event_json)) + logger.debug('xAPI statement of edx event "{}" is: {}'.format(event["name"], event_json)) return json.loads(event_json) return transformed_event diff --git a/event_routing_backends/tasks.py b/event_routing_backends/tasks.py index 6f5e4f00..4cb2e05c 100644 --- a/event_routing_backends/tasks.py +++ b/event_routing_backends/tasks.py @@ -82,5 +82,57 @@ def send_event(task, event_name, event, router_type, host_config): ), exc_info=True ) + raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), + max_retries=getattr(settings, '' + 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) + + +@shared_task(bind=True, base=LoggedPersistOnFailureTask) +def dispatch_bulk_events(self, events, router_type, host_config): + """ + Send a batch of events to the same configured client. + + Arguments: + self (object) : celery task object to perform celery actions + events (list[dict]) : list of event dictionaries to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. + """ + bulk_send_events(self, events, router_type, host_config) + + +def bulk_send_events(task, events, router_type, host_config): + """ + Send event to configured client. + + Arguments: + task (object) : celery task object to perform celery actions + events (list[dict]) : list of event dictionaries to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. + """ + try: + client_class = ROUTER_STRATEGY_MAPPING[router_type] + except KeyError: + logger.error('Unsupported routing strategy detected: {}'.format(router_type)) + return + + try: + client = client_class(**host_config) + client.bulk_send(events) + logger.debug( + 'Successfully bulk dispatched transformed versions of {} events using client: {}'.format( + len(events), + client_class + ) + ) + except EventNotDispatched as exc: + logger.exception( + 'Exception occurred while trying to bulk dispatch {} events using client: {}'.format( + len(events), + client_class + ), + exc_info=True + ) raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), max_retries=getattr(settings, 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) diff --git a/event_routing_backends/utils/http_client.py b/event_routing_backends/utils/http_client.py index 316e84a9..eead052c 100644 --- a/event_routing_backends/utils/http_client.py +++ b/event_routing_backends/utils/http_client.py @@ -49,6 +49,42 @@ def get_auth_header(self): } return {} + def bulk_send(self, events): + """ + Send the list of events to a configured remote. + + Arguments: + events (list[dict]) : list of event payloads to send to host. + + Returns: + requests.Response object + """ + headers = self.HEADERS.copy() + headers.update(self.get_auth_header()) + + options = self.options.copy() + options.update({ + 'url': self.URL, + 'json': events, + 'headers': headers, + }) + if self.AUTH_SCHEME == RouterConfiguration.AUTH_BASIC: + options.update({'auth': (self.username, self.password)}) + logger.debug('Sending caliper version of {} edx events to {}'.format(len(events), self.URL)) + response = requests.post(**options) # pylint: disable=missing-timeout + + if not 200 <= response.status_code < 300: + logger.warning( + '{} request failed for sending Caliper version of {} edx events to {}.Response code: {}. ' + 'Response: ' + '{}'.format( + response.request.method, + len(events), self.URL, + response.status_code, + response.text + )) + raise EventNotDispatched + def send(self, event, event_name): """ Send the event to configured remote. diff --git a/event_routing_backends/utils/xapi_lrs_client.py b/event_routing_backends/utils/xapi_lrs_client.py index 6b7ff57d..4e9f1216 100644 --- a/event_routing_backends/utils/xapi_lrs_client.py +++ b/event_routing_backends/utils/xapi_lrs_client.py @@ -60,6 +60,30 @@ def get_auth_header_value(self): return None + def bulk_send(self, statement_data): + """ + Send a batch of xAPI statements to configured remote. + + Arguments: + statement_data (List[Statement]) : a list of transformed xAPI statements + + Returns: + requests.Response object + """ + logger.debug('Sending {} xAPI statements to {}'.format(len(statement_data), self.URL)) + + response = self.lrs_client.save_statements(statement_data) + + if not response.success: + if response.response.code == 409: + logger.warning(f"Duplicate event id found in: {response.request.content}") + else: + logger.warning(f"Failed request: {response.request.content}") + logger.warning('{} request failed for sending xAPI statement of edx events to {}. ' + 'Response code: {}. Response: {}'.format(response.request.method, self.URL, + response.response.code, response.data)) + raise EventNotDispatched + def send(self, statement_data, event_name): """ Send the xAPI statement to configured remote. diff --git a/requirements/base.in b/requirements/base.in index c13359c7..17a1fb7c 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -1,7 +1,7 @@ # Core requirements for using this application -c constraints.txt -Django # Web application framework +Django # Web application framework isodate pytz requests @@ -13,3 +13,5 @@ edx-toggles # For SettingToggle class tincan event-tracking edx-celeryutils +apache-libcloud # For bulk event log loading +fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it diff --git a/requirements/base.txt b/requirements/base.txt index 74537197..741ec08b 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -8,8 +8,12 @@ amqp==5.1.1 # via kombu aniso8601==9.0.1 # via tincan -asgiref==3.6.0 +apache-libcloud==3.7.0 + # via -r requirements/base.in +asgiref==3.7.2 # via django +backports-zoneinfo[tzdata]==0.2.1 + # via kombu billiard==3.6.4.0 # via celery celery==5.2.7 @@ -40,7 +44,7 @@ click-repl==0.2.0 # via celery code-annotations==1.3.0 # via edx-toggles -cryptography==40.0.2 +cryptography==41.0.1 # via djfernet django==3.2.19 # via @@ -73,7 +77,7 @@ djfernet==0.8.1 # via -r requirements/base.in edx-celeryutils==1.2.2 # via -r requirements/base.in -edx-django-utils==5.4.0 +edx-django-utils==5.5.0 # via # django-config-models # edx-toggles @@ -82,6 +86,8 @@ edx-toggles==5.0.0 # via -r requirements/base.in event-tracking==2.1.0 # via -r requirements/base.in +fasteners==0.18 + # via -r requirements/base.in idna==3.4 # via requests isodate==0.6.1 @@ -92,9 +98,9 @@ jsonfield==3.1.0 # via # -r requirements/base.in # edx-celeryutils -kombu==5.2.4 +kombu==5.3.0 # via celery -markupsafe==2.1.2 +markupsafe==2.1.3 # via jinja2 newrelic==8.8.0 # via edx-django-utils @@ -124,8 +130,10 @@ pytz==2023.3 # tincan pyyaml==6.0 # via code-annotations -requests==2.30.0 - # via -r requirements/base.in +requests==2.31.0 + # via + # -r requirements/base.in + # apache-libcloud six==1.16.0 # via # click-repl @@ -134,7 +142,7 @@ six==1.16.0 # python-dateutil sqlparse==0.4.4 # via django -stevedore==5.0.0 +stevedore==5.1.0 # via # code-annotations # edx-django-utils @@ -142,6 +150,12 @@ text-unidecode==1.3 # via python-slugify tincan==1.0.0 # via -r requirements/base.in +typing-extensions==4.6.3 + # via + # asgiref + # kombu +tzdata==2023.3 + # via backports-zoneinfo urllib3==2.0.2 # via requests vine==5.0.0 diff --git a/requirements/ci.txt b/requirements/ci.txt index 1a2a8e90..041b22a6 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -12,7 +12,7 @@ filelock==3.12.0 # virtualenv packaging==23.1 # via tox -platformdirs==3.5.0 +platformdirs==3.5.1 # via virtualenv pluggy==1.0.0 # via tox diff --git a/requirements/dev.txt b/requirements/dev.txt index 677fa66a..8554ffa1 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -12,15 +12,21 @@ aniso8601==9.0.1 # via # -r requirements/quality.txt # tincan -asgiref==3.6.0 +apache-libcloud==3.7.0 + # via -r requirements/quality.txt +asgiref==3.7.2 # via # -r requirements/quality.txt # django -astroid==2.15.4 +astroid==2.15.5 # via # -r requirements/quality.txt # pylint # pylint-celery +backports-zoneinfo[tzdata]==0.2.1 + # via + # -r requirements/quality.txt + # kombu billiard==3.6.4.0 # via # -r requirements/quality.txt @@ -81,11 +87,11 @@ code-annotations==1.3.0 # -r requirements/quality.txt # edx-lint # edx-toggles -coverage[toml]==7.2.5 +coverage[toml]==7.2.7 # via # -r requirements/quality.txt # pytest-cov -cryptography==40.0.2 +cryptography==41.0.1 # via # -r requirements/quality.txt # djfernet @@ -141,7 +147,7 @@ djfernet==0.8.1 # via -r requirements/quality.txt edx-celeryutils==1.2.2 # via -r requirements/quality.txt -edx-django-utils==5.4.0 +edx-django-utils==5.5.0 # via # -r requirements/quality.txt # django-config-models @@ -161,10 +167,12 @@ exceptiongroup==1.1.1 # pytest factory-boy==3.2.1 # via -r requirements/quality.txt -faker==18.7.0 +faker==18.10.1 # via # -r requirements/quality.txt # factory-boy +fasteners==0.18 + # via -r requirements/quality.txt filelock==3.12.0 # via # -r requirements/ci.txt @@ -198,7 +206,7 @@ jsonfield==3.1.0 # via # -r requirements/quality.txt # edx-celeryutils -kombu==5.2.4 +kombu==5.3.0 # via # -r requirements/quality.txt # celery @@ -206,7 +214,7 @@ lazy-object-proxy==1.9.0 # via # -r requirements/quality.txt # astroid -markupsafe==2.1.2 +markupsafe==2.1.3 # via # -r requirements/quality.txt # jinja2 @@ -236,7 +244,7 @@ pbr==5.11.1 # stevedore pip-tools==6.13.0 # via -r requirements/pip-tools.txt -platformdirs==3.5.0 +platformdirs==3.5.1 # via # -r requirements/ci.txt # -r requirements/quality.txt @@ -269,7 +277,7 @@ pycparser==2.21 # via # -r requirements/quality.txt # cffi -pydantic==1.10.7 +pydantic==1.10.8 # via inflect pydocstyle==6.3.0 # via -r requirements/quality.txt @@ -290,7 +298,7 @@ pylint-django==2.5.3 # via # -r requirements/quality.txt # edx-lint -pylint-plugin-utils==0.7 +pylint-plugin-utils==0.8.2 # via # -r requirements/quality.txt # pylint-celery @@ -312,7 +320,7 @@ pytest==7.3.1 # -r requirements/quality.txt # pytest-cov # pytest-django -pytest-cov==4.0.0 +pytest-cov==4.1.0 # via -r requirements/quality.txt pytest-django==4.5.2 # via -r requirements/quality.txt @@ -337,8 +345,10 @@ pyyaml==6.0 # -r requirements/quality.txt # code-annotations # edx-i18n-tools -requests==2.30.0 - # via -r requirements/quality.txt +requests==2.31.0 + # via + # -r requirements/quality.txt + # apache-libcloud six==1.16.0 # via # -r requirements/ci.txt @@ -357,7 +367,7 @@ sqlparse==0.4.4 # via # -r requirements/quality.txt # django -stevedore==5.0.0 +stevedore==5.1.0 # via # -r requirements/quality.txt # code-annotations @@ -390,12 +400,18 @@ tox==3.28.0 # tox-battery tox-battery==0.6.1 # via -r requirements/dev.in -typing-extensions==4.5.0 +typing-extensions==4.6.3 # via # -r requirements/quality.txt + # asgiref # astroid + # kombu # pydantic # pylint +tzdata==2023.3 + # via + # -r requirements/quality.txt + # backports-zoneinfo urllib3==2.0.2 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 5222b862..8c9f688a 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -16,7 +16,9 @@ aniso8601==9.0.1 # via # -r requirements/test.txt # tincan -asgiref==3.6.0 +apache-libcloud==3.7.0 + # via -r requirements/test.txt +asgiref==3.7.2 # via # -r requirements/test.txt # django @@ -24,6 +26,10 @@ babel==2.12.1 # via # pydata-sphinx-theme # sphinx +backports-zoneinfo[tzdata]==0.2.1 + # via + # -r requirements/test.txt + # kombu beautifulsoup4==4.12.2 # via pydata-sphinx-theme billiard==3.6.4.0 @@ -77,15 +83,14 @@ code-annotations==1.3.0 # via # -r requirements/test.txt # edx-toggles -coverage[toml]==7.2.5 +coverage[toml]==7.2.7 # via # -r requirements/test.txt # pytest-cov -cryptography==40.0.2 +cryptography==41.0.1 # via # -r requirements/test.txt # djfernet - # secretstorage ddt==1.6.0 # via -r requirements/test.txt django==3.2.19 @@ -137,7 +142,7 @@ docutils==0.17.1 # sphinx edx-celeryutils==1.2.2 # via -r requirements/test.txt -edx-django-utils==5.4.0 +edx-django-utils==5.5.0 # via # -r requirements/test.txt # django-config-models @@ -153,10 +158,12 @@ exceptiongroup==1.1.1 # pytest factory-boy==3.2.1 # via -r requirements/test.txt -faker==18.7.0 +faker==18.10.1 # via # -r requirements/test.txt # factory-boy +fasteners==0.18 + # via -r requirements/test.txt idna==3.4 # via # -r requirements/test.txt @@ -177,10 +184,6 @@ isodate==0.6.1 # via -r requirements/test.txt jaraco-classes==3.2.3 # via keyring -jeepney==0.8.0 - # via - # keyring - # secretstorage jinja2==3.1.2 # via # -r requirements/test.txt @@ -192,13 +195,13 @@ jsonfield==3.1.0 # edx-celeryutils keyring==23.13.1 # via twine -kombu==5.2.4 +kombu==5.3.0 # via # -r requirements/test.txt # celery markdown-it-py==2.2.0 # via rich -markupsafe==2.1.2 +markupsafe==2.1.3 # via # -r requirements/test.txt # jinja2 @@ -266,7 +269,7 @@ pytest==7.3.1 # -r requirements/test.txt # pytest-cov # pytest-django -pytest-cov==4.0.0 +pytest-cov==4.1.0 # via -r requirements/test.txt pytest-django==4.5.2 # via -r requirements/test.txt @@ -293,9 +296,10 @@ pyyaml==6.0 # code-annotations readme-renderer==37.3 # via twine -requests==2.30.0 +requests==2.31.0 # via # -r requirements/test.txt + # apache-libcloud # requests-toolbelt # sphinx # twine @@ -305,10 +309,8 @@ restructuredtext-lint==1.4.0 # via doc8 rfc3986==2.0.0 # via twine -rich==13.3.5 +rich==13.4.1 # via twine -secretstorage==3.3.3 - # via keyring six==1.16.0 # via # -r requirements/test.txt @@ -346,7 +348,7 @@ sqlparse==0.4.4 # via # -r requirements/test.txt # django -stevedore==5.0.0 +stevedore==5.1.0 # via # -r requirements/test.txt # code-annotations @@ -367,10 +369,17 @@ tomli==2.0.1 # pytest twine==4.0.2 # via -r requirements/doc.in -typing-extensions==4.5.0 +typing-extensions==4.6.3 # via + # -r requirements/test.txt + # asgiref + # kombu # pydata-sphinx-theme # rich +tzdata==2023.3 + # via + # -r requirements/test.txt + # backports-zoneinfo urllib3==2.0.2 # via # -r requirements/test.txt diff --git a/requirements/pip.txt b/requirements/pip.txt index e6827baa..5a5ce227 100644 --- a/requirements/pip.txt +++ b/requirements/pip.txt @@ -10,5 +10,5 @@ wheel==0.40.0 # The following packages are considered to be unsafe in a requirements file: pip==23.1.2 # via -r requirements/pip.in -setuptools==67.7.2 +setuptools==67.8.0 # via -r requirements/pip.in diff --git a/requirements/quality.txt b/requirements/quality.txt index de4b426b..7001ce8b 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -12,14 +12,20 @@ aniso8601==9.0.1 # via # -r requirements/test.txt # tincan -asgiref==3.6.0 +apache-libcloud==3.7.0 + # via -r requirements/test.txt +asgiref==3.7.2 # via # -r requirements/test.txt # django -astroid==2.15.4 +astroid==2.15.5 # via # pylint # pylint-celery +backports-zoneinfo[tzdata]==0.2.1 + # via + # -r requirements/test.txt + # kombu billiard==3.6.4.0 # via # -r requirements/test.txt @@ -72,11 +78,11 @@ code-annotations==1.3.0 # -r requirements/test.txt # edx-lint # edx-toggles -coverage[toml]==7.2.5 +coverage[toml]==7.2.7 # via # -r requirements/test.txt # pytest-cov -cryptography==40.0.2 +cryptography==41.0.1 # via # -r requirements/test.txt # djfernet @@ -121,7 +127,7 @@ djfernet==0.8.1 # via -r requirements/test.txt edx-celeryutils==1.2.2 # via -r requirements/test.txt -edx-django-utils==5.4.0 +edx-django-utils==5.5.0 # via # -r requirements/test.txt # django-config-models @@ -139,10 +145,12 @@ exceptiongroup==1.1.1 # pytest factory-boy==3.2.1 # via -r requirements/test.txt -faker==18.7.0 +faker==18.10.1 # via # -r requirements/test.txt # factory-boy +fasteners==0.18 + # via -r requirements/test.txt idna==3.4 # via # -r requirements/test.txt @@ -165,13 +173,13 @@ jsonfield==3.1.0 # via # -r requirements/test.txt # edx-celeryutils -kombu==5.2.4 +kombu==5.3.0 # via # -r requirements/test.txt # celery lazy-object-proxy==1.9.0 # via astroid -markupsafe==2.1.2 +markupsafe==2.1.3 # via # -r requirements/test.txt # jinja2 @@ -191,7 +199,7 @@ pbr==5.11.1 # via # -r requirements/test.txt # stevedore -platformdirs==3.5.0 +platformdirs==3.5.1 # via pylint pluggy==1.0.0 # via @@ -223,7 +231,7 @@ pylint-celery==0.3 # via edx-lint pylint-django==2.5.3 # via edx-lint -pylint-plugin-utils==0.7 +pylint-plugin-utils==0.8.2 # via # pylint-celery # pylint-django @@ -240,7 +248,7 @@ pytest==7.3.1 # -r requirements/test.txt # pytest-cov # pytest-django -pytest-cov==4.0.0 +pytest-cov==4.1.0 # via -r requirements/test.txt pytest-django==4.5.2 # via -r requirements/test.txt @@ -264,8 +272,10 @@ pyyaml==6.0 # via # -r requirements/test.txt # code-annotations -requests==2.30.0 - # via -r requirements/test.txt +requests==2.31.0 + # via + # -r requirements/test.txt + # apache-libcloud six==1.16.0 # via # -r requirements/test.txt @@ -280,7 +290,7 @@ sqlparse==0.4.4 # via # -r requirements/test.txt # django -stevedore==5.0.0 +stevedore==5.1.0 # via # -r requirements/test.txt # code-annotations @@ -299,10 +309,17 @@ tomli==2.0.1 # pytest tomlkit==0.11.8 # via pylint -typing-extensions==4.5.0 +typing-extensions==4.6.3 # via + # -r requirements/test.txt + # asgiref # astroid + # kombu # pylint +tzdata==2023.3 + # via + # -r requirements/test.txt + # backports-zoneinfo urllib3==2.0.2 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index b4f74fa8..0cee9444 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -12,10 +12,16 @@ aniso8601==9.0.1 # via # -r requirements/base.txt # tincan -asgiref==3.6.0 +apache-libcloud==3.7.0 + # via -r requirements/base.txt +asgiref==3.7.2 # via # -r requirements/base.txt # django +backports-zoneinfo[tzdata]==0.2.1 + # via + # -r requirements/base.txt + # kombu billiard==3.6.4.0 # via # -r requirements/base.txt @@ -64,9 +70,9 @@ code-annotations==1.3.0 # -r requirements/base.txt # -r requirements/test.in # edx-toggles -coverage[toml]==7.2.5 +coverage[toml]==7.2.7 # via pytest-cov -cryptography==40.0.2 +cryptography==41.0.1 # via # -r requirements/base.txt # djfernet @@ -108,7 +114,7 @@ djfernet==0.8.1 # via -r requirements/base.txt edx-celeryutils==1.2.2 # via -r requirements/base.txt -edx-django-utils==5.4.0 +edx-django-utils==5.5.0 # via # -r requirements/base.txt # django-config-models @@ -122,8 +128,10 @@ exceptiongroup==1.1.1 # via pytest factory-boy==3.2.1 # via -r requirements/test.in -faker==18.7.0 +faker==18.10.1 # via factory-boy +fasteners==0.18 + # via -r requirements/base.txt idna==3.4 # via # -r requirements/base.txt @@ -140,11 +148,11 @@ jsonfield==3.1.0 # via # -r requirements/base.txt # edx-celeryutils -kombu==5.2.4 +kombu==5.3.0 # via # -r requirements/base.txt # celery -markupsafe==2.1.2 +markupsafe==2.1.3 # via # -r requirements/base.txt # jinja2 @@ -186,7 +194,7 @@ pytest==7.3.1 # via # pytest-cov # pytest-django -pytest-cov==4.0.0 +pytest-cov==4.1.0 # via -r requirements/test.in pytest-django==4.5.2 # via -r requirements/test.in @@ -210,8 +218,10 @@ pyyaml==6.0 # via # -r requirements/base.txt # code-annotations -requests==2.30.0 - # via -r requirements/base.txt +requests==2.31.0 + # via + # -r requirements/base.txt + # apache-libcloud six==1.16.0 # via # -r requirements/base.txt @@ -223,7 +233,7 @@ sqlparse==0.4.4 # via # -r requirements/base.txt # django -stevedore==5.0.0 +stevedore==5.1.0 # via # -r requirements/base.txt # code-annotations @@ -238,6 +248,15 @@ tomli==2.0.1 # via # coverage # pytest +typing-extensions==4.6.3 + # via + # -r requirements/base.txt + # asgiref + # kombu +tzdata==2023.3 + # via + # -r requirements/base.txt + # backports-zoneinfo urllib3==2.0.2 # via # -r requirements/base.txt