Skip to content

Commit

Permalink
feat: Add command to bulk transform event log files
Browse files Browse the repository at this point in the history
  • Loading branch information
bmtcril committed Jun 5, 2023
1 parent 47b4001 commit d5999f3
Show file tree
Hide file tree
Showing 31 changed files with 1,787 additions and 157 deletions.
118 changes: 118 additions & 0 deletions docs/howto/how_to_bulk_transform.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
How To Bulk Transform Tracking Logs
===================================

This is a rough guide of how to transform existing tracking log files into the formats supported by event-routing-backends using the ``transform_tracking_logs`` Django management command inside a running LMS installation. Because the transformations perform database access, looking up user, course, and block data, you will need to run this command on the same install of Open edX that created the tracking log files.

.. warning:: This also means that doing large amounts of transformations can cause performance issues on the LMS and downstream learning record stores. Make sure to use the ``--batch_size`` and ``--sleep_between_batches_secs`` options to balance system performance vs load time.

Sources and Destinations
------------------------

For most sources and destinations we use `Apache Libcloud Object storage <https://libcloud.readthedocs.io/en/stable/supported_providers.html>`__ . This should cover casees from local storage to Amazon S3, MinIO, and many other cloud storage solutions. The ``--source_provider`` and ``--destination_provider`` options are the Libcloud Provider names, in all caps (ex: ``S3``, ``LOCAL``, ``MINIO``). The ``--source_config`` and ``--destination_config`` options are JSON strings passed directly to the Libcloud constructor as keyword args.

The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores.

For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path.

::

# This will attempt to recursively read all files in the "/openedx/data/logs/" directory
{"key": "/openedx/data", "container": "logs"}

# This will attempt to recursively read all files in "/openedx/data/logs/tracking/" as
# well as any files in "/openedx/data/logs/" that begin with "tracking"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking"}

# This will attempt to read a single file named "/openedx/data/logs/tracking.log"
{"key": "/openedx/data", "container": "logs", "prefix": "tracking.log"}


For other providers ``key`` and ``secret`` are authentication credentials and ``container`` is roughly synonymous with an S3 bucket. Configuration for each provider is different, please consult the libcloud docs for your provider to learn about other options you may need to pass in to the ``--source_config`` and ``--destination_config`` JSON structures.


Modes Of Operation
------------------

The command can work in a few distinct ways.

**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues.

.. note:: Note that any event filters in the ERB configuration will work as usual on these events.

**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied.

.. note:: Note that NO event filters from the usual ERB configuration will impact these events, since they happen on a per-destination basis.

Additionally you can run with the ``--log_statements`` option in either mode to send all generated statements to a Python logger which can be configured to save to a file, standard out, or a log forwarder like `Vector <https://vector.dev/>`__ for more statement handling options. These logs are not filtered.

**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with ``--log_statements`` you can use Python log forwarding without the additional overhead of storing full files.


Examples
--------

**Files to LRS**

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type xapi

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
# using a smaller batch size and long sleep for LMS performance
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type caliper \
--batch_size 1000 \
--sleep_between_batches_secs 2.5

::

# Recursively transform any files whose names start with "tracking" from a "logs" directory in the
# MINIO bucket "logs" to all configured LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking"}' \
--destination_provider MINIO --destination_config '{"key": "openedx", "secret": "minio secret key", "container": "openedx", "prefix": "transformed_logs/2023-06-01/", "host": "files.local.overhang.io", "secure": false}' \
--transformer_type xapi

python manage.py lms transform_tracking_logs \
--transformer_type xapi
--source_provider S3 \
--source_config '{"key": "AWS key", "secret": "AWS secret", "container": "logs", "prefix":"tracking"}' \
--destination_provider LRS

**Files to Files**

::

# Transform the entire local file /openedx/data/tracking.log to a new file in the local directory
# /openedx/data/logs/transformed_events/ the file will be named with the current timestamp.
# Note: The "container" directory must exist!
python manage.py lms transform_tracking_logs \
--transformer_type caliper \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "container": "logs", "prefix": "tracking.log"}' \
--destination_provider LOCAL \
--destination_config '{"key": "/openedx/data/", "container": "transformed_logs", "prefix": "2023-06-01"}'

::

# Recursively transform any files whose names start with "tracking" from a "tracking_logs" directory in the
# MinIO bucket "openedx" to a single file in a MinIO storage's "transformed_logs" bucket.
# ie: http://files.local.overhang.io/openedx/tracking_logs/tracking* to http://files.local.overhang.io/openedx/transformed_logs/2023-06-02/23-06-02_20-29-20_xapi.log
# This is the configuration for a tutor local environment with MinIO enabled.
python manage.py lms transform_tracking_logs \
--source_provider MINIO \
--source_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "/tracking_logs", "host": "files.local.overhang.io", "secure": false}' \
--destination_provider MINIO \
--destination_config '{"key": "openedx", "secret": "minio secret", "container": "openedx", "prefix": "transformed_logs/2023-06-02/", "host": "files.local.overhang.io", "secure": false}' --transformer_type xapi

175 changes: 114 additions & 61 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_event, dispatch_event_persistent
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand All @@ -28,80 +28,133 @@ def __init__(self, processors=None, backend_name=None):
self.processors = processors if processors else []
self.backend_name = backend_name

def send(self, event):
def configure_host(self, host, router):
"""
Send the event to configured routers after processing it.
Event is processed through the configured processors. A router config
object matching the backend_name and other match params is used to get
the list of hosts to which the event is required to be delivered to.
Arguments:
event (dict): original event dictionary
Create host_configurations for the given host and router.
"""
host['host_configurations'] = {}
host['host_configurations'].update({'url': router.route_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})
else:
host.update({'router_type': 'INVALID_TYPE'})

return host

def prepare_to_send(self, events):
"""
Prepare a list of events to be sent and create a processed, filtered batch for each router.
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)
business_critical_events = get_business_critical_events()
route_events = {}

if not routers:
logger.info('Could not find any enabled router configuration for backend %s', self.backend_name)
return
return route_events

try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc
try:
logger.debug('Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name))
for event in events:
try:
event_name = event['name']
except TypeError as exc:
raise ValueError('Expected event as dict but {type} was given.'.format(type=type(event))) from exc

processed_event = self.process_event(event)
try:
logger.debug(
'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)
)

processed_event = self.process_event(event)
except (EventEmissionExit, ValueError):
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
event_name,
self.backend_name,
exc_info=True
)
continue

except EventEmissionExit:
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
exc_info=True
processed_event
)
return

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
event_name,
self.backend_name,
processed_event
)

for router in routers:
host = router.get_allowed_host(event)

router_url = router.route_url
if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router %s with backend "%s"',
event_name, router_url, self.backend_name
)
else:
updated_event = self.overwrite_event_data(processed_event, host, event_name)
host['host_configurations'] = {}
host['host_configurations'].update({'url': router_url})
host['host_configurations'].update({'auth_scheme': router.auth_scheme})

if router.auth_scheme == RouterConfiguration.AUTH_BASIC:
host['host_configurations'].update({'username': router.username})
host['host_configurations'].update({'password': router.password})
elif router.auth_scheme == RouterConfiguration.AUTH_BEARER:
host['host_configurations'].update({'auth_key': router.auth_key})

if router.backend_name == RouterConfiguration.CALIPER_BACKEND:
host.update({'router_type': 'AUTH_HEADERS'})
if 'headers' in host:
host['host_configurations'].update({'headers': host['headers']})
elif router.backend_name == RouterConfiguration.XAPI_BACKEND:
host.update({'router_type': 'XAPI_LRS'})

for router in routers:
host = router.get_allowed_host(event)
router_pk = router.pk

if not host:
logger.info(
'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"',
event_name, router_pk, self.backend_name
)
else:
host.update({'router_type': 'INVALID_TYPE'})
host = self.configure_host(host, router)
updated_event = self.overwrite_event_data(processed_event, host, event_name)
is_business_critical = event_name in business_critical_events
if router_pk not in route_events:
route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),]
else:
route_events[router_pk].append((event_name, updated_event, host, is_business_critical))

return route_events

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.
Event is processed through the configured processors. A router config
object matching the backend_name and other match params is used to get
the list of hosts to which the event is required to be delivered to.
Arguments:
events (list[dict]): list of original event dictionaries
"""
event_routes = self.prepare_to_send(events)

for events_for_route in event_routes.values():
prepared_events = []
host = None
for _, updated_event, host, _ in events_for_route:
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
prepared_events,
host['router_type'],
host['host_configurations']
)

def send(self, event):
"""
Send the event to configured routers after processing it.
Event is processed through the configured processors. A router config
object matching the backend_name and other match params is used to get
the list of hosts to which the event is required to be delivered to.
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])

business_critical_events = get_business_critical_events()
if event_name in business_critical_events:
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
event_name,
updated_event,
Expand Down
Loading

0 comments on commit d5999f3

Please sign in to comment.