Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Management command to batch load tracking logs #301

Merged
merged 10 commits into from
Jun 20, 2023
4 changes: 4 additions & 0 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ Two types of configuration are needed for the plugin:

By default, both ``xapi`` and ``caliper`` backends are already configured along with filters that allow all the supported events. ``caliper`` backend is disabled by default and can be enabled by setting ``CALIPER_EVENTS_ENABLED`` to ``True`` in plugin settings.

Additionally separate log streams for xAPI and Caliper are generated as events are transformed and can be configured to be saved or ignored. These can be configured as described in the `Django docs <https://docs.djangoproject.com/en/4.2/topics/logging/>`_ for the ``xapi_tracking`` and ``caliper_tracking`` loggers.



Router configuration
--------------------

Expand Down
121 changes: 121 additions & 0 deletions docs/howto/how_to_bulk_transform.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
How To Bulk Transform Tracking Logs
bmtcril marked this conversation as resolved.
Show resolved Hide resolved
===================================

This is a rough guide of how to transform existing tracking log files into the formats supported by event-routing-backends using the ``transform_tracking_logs`` Django management command inside a running LMS installation. Because the transformations perform database access, looking up user, course, and block data, you will need to run this command on the same install of Open edX that created the tracking log files.

.. warning:: This also means that doing large amounts of transformations can cause performance issues on the LMS and downstream learning record stores. Make sure to use the ``--batch_size`` and ``--sleep_between_batches_secs`` options to balance system performance vs load time.

Sources and Destinations
------------------------

For most sources and destinations we use `Apache Libcloud Object storage <https://libcloud.readthedocs.io/en/stable/supported_providers.html>`__ . This should cover casees from local storage to Amazon S3, MinIO, and many other cloud storage solutions. The ``--source_provider`` and ``--destination_provider`` options are the Libcloud Provider names, in all caps (ex: ``S3``, ``LOCAL``, ``MINIO``). The ``--source_config`` and ``--destination_config`` options are JSON strings passed directly to the Libcloud constructor as keyword args.

The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores.

For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path.

::

# This will attempt to recursively read all files in the "/openedx/data/logs/" directory
{"key": "/openedx/data", "container": "logs"}

# This will attempt to recursively read all files in "/openedx/data/logs/tracking/" as
# well as any files in "/openedx/data/logs/" that begin with "tracking"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking"}

# This will attempt to read a single file named "/openedx/data/logs/tracking.log"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking.log"}


For other providers ``key`` and ``secret`` are authentication credentials and ``container`` is roughly synonymous with an S3 bucket. Configuration for each provider is different, please consult the libcloud docs for your provider to learn about other options you may need to pass in to the ``--source_config`` and ``--destination_config`` JSON structures.


Modes Of Operation
------------------

The command can work in a few distinct ways.

**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues.

**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied.

Additionally all generated statements are written to a Python logger which can be configured to be ignored, save to a file, write standard out, or a log forwarder like `Vector <https://vector.dev/>`__ for more statement handling options. The two loggers are named ``xapi_tracking`` and ``caliper_tracking``, and are always running.

**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files.

.. warning::
Events may be filtered differently in this command than in normal operation. Normally events pass through two layers of filters as described in `getting started <docs/getting_started.rst>`_.

First are the eventtracking AsyncRoutingBackend can have processor filters, which will be ignored when running this script (since these events have already passed through the eventtracking process).

Second are the router configuration filters which work on a per-LRS basis. These are respected when the destination is LRS, but ignored when writing to a file and the loggers.


Examples
--------

**Files to LRS**

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type xapi

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
# using a smaller batch size and long sleep for LMS performance
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type caliper \
--batch_size 1000 \
--sleep_between_batches_secs 2.5

::

# Recursively transform any files whose names start with "tracking" from a "logs" directory in the
# MINIO bucket "logs" to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking"}' \
--destination_provider MINIO --destination_config '{"key": "openedx", "secret": "minio secret key", "container": "openedx", "prefix": "transformed_logs/2023-06-01/", "host": "files.local.overhang.io", "secure": false}' \
--transformer_type xapi

python manage.py lms transform_tracking_logs \
--transformer_type xapi
--source_provider S3 \
--source_config '{"key": "AWS key", "secret": "AWS secret", "container": "logs", "prefix":"tracking"}' \
--destination_provider LRS

**Files to Files**

::

# Transform the entire local file /openedx/data/tracking.log to a new file in the local directory
# /openedx/data/logs/transformed_events/ the file will be named with the current timestamp.
# Note: The "container" directory must exist!
python manage.py lms transform_tracking_logs \
--transformer_type caliper \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "container": "logs", "prefix": "tracking.log"}' \
--destination_provider LOCAL \
--destination_config '{"key": "/openedx/data/", "container": "transformed_logs", "prefix": "2023-06-01"}'
bmtcril marked this conversation as resolved.
Show resolved Hide resolved

::

# Recursively transform any files whose names start with "tracking" from a "tracking_logs" directory in the
# MinIO bucket "openedx" to a single file in a MinIO storage's "transformed_logs" bucket.
# ie: http://files.local.overhang.io/openedx/tracking_logs/tracking* to http://files.local.overhang.io/openedx/transformed_logs/2023-06-02/23-06-02_20-29-20_xapi.log
# This is the configuration for a tutor local environment with MinIO enabled.
python manage.py lms transform_tracking_logs \
--source_provider MINIO \
--source_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "/tracking_logs", "host": "files.local.overhang.io", "secure": false}' \
--destination_provider MINIO \
--destination_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "transformed_logs/2023-06-02/", "host": "files.local.overhang.io", "secure": false}' --transformer_type xapi

180 changes: 118 additions & 62 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_event, dispatch_event_persistent
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand All @@ -28,7 +28,95 @@ def __init__(self, processors=None, backend_name=None):
self.processors = processors if processors else []
self.backend_name = backend_name

def send(self, event):
def configure_host(self, host, router):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is just broken out from the original send() method so both send() and bulk_send() can use it, I don't believe anything changed here.

"""
Create host_configurations for the given host and router.
"""
host['host_configurations'] = {}
host['host_configurations'].update({'url': router.route_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})
else:
host.update({'router_type': 'INVALID_TYPE'})

return host

def prepare_to_send(self, events):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality was also broken out of send() and adapted to handle multiple events.

"""
Prepare a list of events to be sent and create a processed, filtered batch for each router.
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)
business_critical_events = get_business_critical_events()
route_events = {}

# We continue even without routers here to allow logging of statements to happen.
# If operators do not wish to log and have no enabled routers they should set XAPI_EVENTS_ENABLED
# or CALIPER_EVENTS_ENABLED to false.
if not routers:
logger.debug('Could not find any enabled router configuration for backend %s', self.backend_name)
routers = []

for event in events:
try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc

try:
logger.debug(
'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)
)

processed_event = self.process_event(event)
except (EventEmissionExit, ValueError):
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
event_name,
self.backend_name,
exc_info=True
)
continue

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
processed_event
)

for router in routers:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we batch events into a dictionary of lists, where the key is the primary key of the RouterConfiguration. Later we use these lists as the batches to send to each downstream LRS.

host = router.get_allowed_host(event)
router_pk = router.pk

if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"',
event_name, router_pk, self.backend_name
)
else:
host = self.configure_host(host, router)
updated_event = self.overwrite_event_data(processed_event, host, event_name)
is_business_critical = event_name in business_critical_events
if router_pk not in route_events:
route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),]
else:
route_events[router_pk].append((event_name, updated_event, host, is_business_critical))

return route_events

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.

Expand All @@ -37,71 +125,39 @@ def send(self, event):
the list of hosts to which the event is required to be delivered to.

Arguments:
event (dict): original event dictionary
events (list[dict]): list of original event dictionaries
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)

if not routers:
logger.info('Could not find any enabled router configuration for backend %s', self.backend_name)
return
event_routes = self.prepare_to_send(events)

for events_for_route in event_routes.values():
prepared_events = []
host = None
for _, updated_event, host, _ in events_for_route:
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
prepared_events,
host['router_type'],
host['host_configurations']
)

try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc
try:
logger.debug('Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name))
def send(self, event):
"""
Send the event to configured routers after processing it.

processed_event = self.process_event(event)
Event is processed through the configured processors. A router config
object matching the backend_name and other match params is used to get
the list of hosts to which the event is required to be delivered to.

except EventEmissionExit:
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
event_name,
self.backend_name,
exc_info=True
)
return

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
processed_event
)

for router in routers:
host = router.get_allowed_host(event)

router_url = router.route_url
if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router %s with backend "%s"',
event_name, router_url, self.backend_name
)
else:
updated_event = self.overwrite_event_data(processed_event, host, event_name)
host['host_configurations'] = {}
host['host_configurations'].update({'url': router_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})
else:
host.update({'router_type': 'INVALID_TYPE'})
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])

business_critical_events = get_business_critical_events()
if event_name in business_critical_events:
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
event_name,
updated_event,
Expand Down
Loading