-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Management command to batch load tracking logs #301
Conversation
3698d60
to
0b45b2c
Compare
c88f6eb
to
d5999f3
Compare
@pomegranited @Ian2012 apologies for the size of the PR, but this is the work to support tracking log bulk transform, backfill, and repair. It should also unlock making the LRS optional and saving transformed statements to cold storage via log forwarding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bmtcril I know this will require iteration, since tracking log parsing is fraught, and some users will need to be able to import logs even when the courses or users have since disappeared, but it's a great start!
I had one puzzling issue with file to file, but file-to-LRS worked as expected. The rest of my comments are just nits and whinges :)
👍
- I tested this on my Tutor dev lms shell by trying the example commands in the docs with various tracking log inputs.
- I read through the code
-
I checked for accessibility issuesN/A - Includes great documentation
- Commit structure follows OEP-0051
docs/howto/how_to_bulk_transform.rst
Outdated
|
||
**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied. | ||
|
||
.. note:: Note that NO event filters from the usual ERB configuration will impact these events, since they happen on a per-destination basis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand this at first, but got there eventually.. You mean that the router configuration filters won't be used for file-to-file, but backends configuration filters will still be used, right? That's a bit confusing, and worth clarifying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually backend filters are also skipped since they happen at the tracking log emission level and we're working above there at this point. I'll try to clarify it all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thank you for clarifying @bmtcril !
self.event_queue.append(event) | ||
if len(self.event_queue) == self.max_queue_size: | ||
if self.dry_run: | ||
print("Dry run, skipping, but still clearing the queue.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you use logging
instead of print statements throughout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a management command using logging will make the output dependent on the Django config, which a lot of people run at WARNING level. Seems weird to emit these as warnings, but they're important so I went with print statements. I tried it the other way first, but kept having to force the log level to INFO which also felt weird. I think I'd rather leave it this way for that reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bmtcril Ah in that case, you can use Django management BaseCommand's stdout
and stderr
attributes, ref https://docs.djangoproject.com/en/4.2/howto/custom-management-commands/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking further into it more that works ok from the command itself (which only has one print statement), but since it's a member on the command class it would become awkward to pass that around to the several other functions and classes that need to do output. It seems like a lot of refactoring work and I'm not really sure what the benefit would be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries at all @bmtcril , Ok to leave the print statements as is. 👍
@@ -82,5 +82,57 @@ def send_event(task, event_name, event, router_type, host_config): | |||
), | |||
exc_info=True | |||
) | |||
raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), | |||
max_retries=getattr(settings, '' | |||
'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uck.. this is more of a whinge about Ralph, but.. if even one of the events in the batch we send is a duplicate, then the whole request fails with a 400 Bad Request error. For a command that will likely need to be re-entrant, this is really annoying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually wrote that code based on my understanding of the xAPI spec at the time. I think it was a flawed understanding, though, and I'll see if they will take an update to that logic to not fail on duplicate statements unless the same event id is in the same batch (which is a bad thing and should fail). Spec is here: https://github.com/adlnet/xAPI-Spec/blob/1.0.3/xAPI-Communication.md#212-post-statements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be fantastic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an issue for it. It turns out I didn't write it, just copied it to the PUT implementation where it is probably more appropriate: openfun/ralph#345
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
event_routing_backends/management/commands/helpers/event_log_parser.py
Outdated
Show resolved
Hide resolved
8388f49
to
4abdddb
Compare
I believe I've addressed all of the initial feedback and it's ready for re-review |
87db622
to
4abdddb
Compare
LGTM! 👍 |
7d0d2e2
to
34cabd3
Compare
@@ -28,7 +28,95 @@ def __init__(self, processors=None, backend_name=None): | |||
self.processors = processors if processors else [] | |||
self.backend_name = backend_name | |||
|
|||
def send(self, event): | |||
def configure_host(self, host, router): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is just broken out from the original send() method so both send() and bulk_send() can use it, I don't believe anything changed here.
|
||
return host | ||
|
||
def prepare_to_send(self, events): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This functionality was also broken out of send()
and adapted to handle multiple events.
processed_event | ||
) | ||
|
||
for router in routers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we batch events into a dictionary of lists, where the key is the primary key of the RouterConfiguration. Later we use these lists as the batches to send to each downstream LRS.
@bmtcril When running against an s3 bucket with a tracking log for courses that don't exist in the instance (perhaps, deleted courses), there is an error because the course doesn't exist.
The command was run with: python manage.py lms transform_tracking_logs --transformer_type xapi --source_provider S3 --source_config '{"key": "...", "secret": "...", "region": "us-west-2" ,"container": "...", "prefix":"..."}' --destination_provider LRS This one fix it #308 |
@bmtcril There is another issue when the Line 150 in ca377d2
Would this be a valid solution: score={
'min': 0,
'max': event_data['weighted_possible'],
'raw': event_data['weighted_earned'],
'scaled': event_data['weighted_earned']/event_data['weighted_possible'] if event_data['weighted_possible'] else 0
} |
Another issue is when the |
There is too much verbosity when sending events in batch, is this needed or can this be disabled so one can just verify the actual progress? Oh, is celery that logs that: BTW: the process has to finish the printing so it can send all the events, it adds a 5-10 second delay |
I did find another issue: TypeError if the dotted path object is str. In the mentioned PR there will be the fixes. BTW: it seems there is confusion around the the data field, it's not a dictionary but a string, so this never works event-routing-backends/event_routing_backends/processors/mixins/base_transformer.py Line 131 in ca377d2
|
@@ -10,6 +10,7 @@ | |||
from event_routing_backends.processors.caliper.registry import CaliperTransformersRegistry | |||
from event_routing_backends.processors.mixins.base_transformer_processor import BaseTransformerProcessorMixin | |||
|
|||
logger = getLogger(__name__) | |||
caliper_logger = getLogger('caliper_tracking') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in here and the xapi processor are just to keep the statement logging separate from debug output.
@@ -126,9 +126,11 @@ def extract_username_or_userid(self): | |||
Returns: | |||
str | |||
""" | |||
username_or_id = self.get_data('context.username') or self.get_data('context.user_id') | |||
username_or_id = self.get_data('username') or self.get_data('user_id') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are because of some differences in format between tracking log files and what comes in via the async handler.
@@ -58,7 +58,7 @@ def get_object(self): | |||
`Activity` | |||
""" | |||
return Activity( | |||
id=self.get_data('data.target_url'), | |||
id=self.get_data('data.target_url', True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this required since it's a required attribute and will fail downstream without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was debugging this one hahah. Have in mind that if the error is too big (like for this tasks with 10000 arguments) mysql cannot save all that information:
_mysql.connection.query(self, query)
django.db.utils.OperationalError: (1153, "Got a packet bigger than 'max_allowed_packet' bytes")
@@ -133,6 +132,17 @@ class ProblemSubmittedTransformer(BaseProblemsTransformer): | |||
""" | |||
additional_fields = ('result', ) | |||
|
|||
def get_object(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to make this change because these events were failing at the LRS level for not being IRIs, you can see the change in the test for what it's doing.
I'm not very familiar with this event, so I'm not sure what the underlying meaning of those failures is. I think it should probably be tracked as a separate issue from this one, though, so we can see if there's someone more familiar we can ask about it. |
@bmtcril I'm getting: POST request failed for sending xAPI statement of edx events to http://ralph:8100/xAPI/. Response code: 422. Response: {"detail":[{"loc":["body"],"msg":"value is not a valid dict","type":"type_error.dict"},{"loc":["body",988,"object","id"],"msg":"field required","type":"value_error.missing"},{"loc":["body",7485,"object","id"],"msg":"field required","type":"value_error.missing"}]} |
@bmtcril There is another error, is there a way to retry when those errors happens?
|
@Ian2012 it's really hard to reply to these here, can we tackle them in Slack then figure out where in the PR an issue is happening and comment it there? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the error mentioned above are not related to the functionality of this PR but for the underlying mechanisms. I will create proper issues to track them
These can be too big for MySQL to log, we shouldn't be swamping the database with them anyway.
We found a few things testing with production data that I will tackle Monday:
|
An error was observed downloading bulk events from S3 where AWS was closing the connection mid-download. This seems likely to be because we are doing the transform and send operations in the download loop to save memory. The current change downloads a discrete range of bytes in each call so there is no open file handle between requests. We download in 2MB chunks so the overhead should be negligible unless the log files are huge.
An error was observed downloading bulk events from S3 where AWS was closing the connection mid-download. This seems likely to be because we are doing the transform and send operations in the download loop to save memory. The current change downloads a discrete range of bytes in each call so there is no open file handle between requests. We download in 2MB chunks so the overhead should be negligible unless the log files are huge.
An error was observed downloading bulk events from S3 where AWS was closing the connection mid-download. This seems likely to be because we are doing the transform and send operations in the download loop to save memory. The current change downloads a discrete range of bytes in each call so there is no open file handle between requests. We download in 2MB chunks so the overhead should be negligible unless the log files are huge.
Added new feature toggles for logging xapi and caliper statements to help performance when they're not needed.
Added new feature toggles for logging xapi and caliper statements to help performance when they're not needed.
Ok, I believe all of the above issues are covered and test coverage is back to 100%. I'm hoping for one last pass from @ziafazal to make sure we're doing the right things here before merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bmtcril @Ian2012 overall looks good and I tested with FILE to LRS and FILE To FILE flows. One thing I noticed when --transformer_type
is caliper
it breaks while transforming data from tracking.log
with this error.
"edx.video.closed_captions.hidden" using CaliperProcessor processor. Error: unsupported type for timedelta seconds component: NoneType
Traceback (most recent call last):
File "/openedx/requirements/event-routing-backends/event_routing_backends/processors/mixins/base_transformer_processor.py", line 58, in transform_event
transformed_event = self.get_transformed_event(event)
File "/openedx/requirements/event-routing-backends/event_routing_backends/processors/mixins/base_transformer_processor.py", line 100, in get_transformed_event
return self.registry.get_transformer(event).transform()
File "/openedx/requirements/event-routing-backends/event_routing_backends/processors/mixins/base_transformer.py", line 105, in transform
value = getattr(self, f'get_{key}')()
File "/openedx/requirements/event-routing-backends/event_routing_backends/processors/caliper/event_transformers/video_events.py", line 98, in get_object
'duration': duration_isoformat(timedelta(
TypeError: unsupported type for timedelta seconds component: NoneType
Reason some video events have no duration
attribute.
I have to replace this
'duration': duration_isoformat(timedelta(
seconds=data.get('duration', 0)
))
with this
'duration': convert_seconds_to_iso(
seconds=data.get('duration', 0)
)
in vent_routing_backends/processors/caliper/event_transformers/video_events.py
to get rid of error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bmtcril Here is my approval, Let's not forget to create proper issues to fix the errors that we got during our testing scenarios
Description:
This resolves #260
This PR adds a Django management command to transform and load tracking log files, and refactors some core parts of the project to work with batch sending. It provides a lot of capability at the cost of some complexity.
Notably there are some format differences between what is stored on disk and what is passed to event-routing-backends over the async router. This PR attempts to consolidate on the async format, but there is some branching that has to happen to locate data in different places.
Testing instructions:
I've been testing this in a Tutor nightly environment with the OARS plugins enabled and initialized. From there you can install this branch on the LMS or LMS Worker running container by:
Then trying the examples listed in the new document to transform data to files or the LRS such as:
python manage.py lms transform_tracking_logs --source_provider LOCAL --source_config '{"key": "/openedx/data", "container": "logs", "prefix":"tracking.log"}' --destination_provider LOCAL --destination_config '{"key": "/openedx/", "container": "data", "prefix": "logs"}' --transformer_type xapi
and
python manage.py lms transform_tracking_logs --source_provider LOCAL --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' --destination_provider LRS --transformer_type xapi
Merge checklist:
Post merge:
finished.
Author concerns: The only current functionality that is untested is using the logs statements via Vector to forward logs along without any configured LRS. I hope to have this working soon, but it's a notable deficiency.