From a90dc2849fc79e2e0e834451c0953a95b3b88230 Mon Sep 17 00:00:00 2001 From: Tom Reitz Date: Thu, 8 Jun 2023 17:22:24 -0500 Subject: [PATCH 1/3] adding structured output option for easier downstream error processing; see description and sample output added to README --- README.md | 62 ++++++++++++++++++++++++++++++++++++++ lightbeam/__main__.py | 10 +++++-- lightbeam/lightbeam.py | 3 +- lightbeam/send.py | 68 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c43d946..2e295e1 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,68 @@ lightbeam send -c path/to/config.yaml -w lightbeam send -c path/to/config.yaml --wipe ``` +## Structured output of run results +To produce a JSON file with metadata about the run, invoke lightbeam with +```bash +lightbeam send -c path/to/config.yaml --results-file ./results.json +``` +A sample results file could be: + +```json +{ + "started_at": "2023-06-08T17:18:25.053207", + "working_dir": "/home/someuser/code/sandbox/testing_lightbeam", + "config_file": "lightbeam.yml", + "data_dir": "./", + "api_url": "https://some-ed-fi-api.edu/api", + "namespace": "ed-fi", + "resources": { + "studentSchoolAssociations": { + "failed_statuses": { + "400": { + "400: { \"message\": \"The request is invalid.\", \"modelState\": { \"request.schoolReference.schoolId\": [ \"JSON integer 1234567899999 is too large or small for an Int32. Path 'schoolReference.schoolId', line 1, position 328.\" ] } }": { + "files": { + "./studentSchoolAssociations.jsonl": { + "line_numbers": "6,4,5,7,8", + "count": 5 + } + } + }, + "400: { \"message\": \"Validation of 'StudentSchoolAssociation' failed.\\n\\tStudent reference could not be resolved.\\n\" }": { + "files": { + "./studentSchoolAssociations.jsonl": { + "line_numbers": "1,3,2", + "count": 3 + } + } + }, + "count": 8 + }, + "409": { + "409: { \"message\": \"The value supplied for the related 'studentschoolassociation' resource does not exist.\" }": { + "files": { + "./studentSchoolAssociations.jsonl": { + "line_numbers": "9,10,12,14,16,13,11,15,17,18,19,21,22,20", + "count": 14 + } + } + }, + "count": 14 + } + }, + "records_processed": 22, + "records_skipped": 0, + "records_failed": 22 + } + }, + "completed_at": "2023-06-08T17:18:26.724699", + "runtime_sec": 1.671492, + "total_records_processed": 22, + "total_records_skipped": 0, + "total_records_failed": 22 +} +``` + # Design Some details of the design of this tool are discussed below. diff --git a/lightbeam/__main__.py b/lightbeam/__main__.py index 173eb5d..90c319b 100644 --- a/lightbeam/__main__.py +++ b/lightbeam/__main__.py @@ -75,7 +75,12 @@ def main(argv=None): type=str, help='only payloads that returned one of these comma-delimited HTTP status codes on last send will be processed' ) - defaults = { "selector":"*", "params": "", "older_than": "", "newer_than": "", "resend_status_codes": "" } + parser.add_argument("--results-file", + type=str, + help='produces a JSON output file with structured information about run results' + ) + + defaults = { "selector":"*", "params": "", "older_than": "", "newer_than": "", "resend_status_codes": "", "results_file": "" } parser.set_defaults(**defaults) args, remaining_argv = parser.parse_known_args() @@ -110,7 +115,8 @@ def main(argv=None): force=args.force, older_than=args.older_than, newer_than=args.newer_than, - resend_status_codes=args.resend_status_codes + resend_status_codes=args.resend_status_codes, + results_file=args.results_file ) try: logger.info("starting...") diff --git a/lightbeam/lightbeam.py b/lightbeam/lightbeam.py index 1f428f9..2bc9a63 100644 --- a/lightbeam/lightbeam.py +++ b/lightbeam/lightbeam.py @@ -42,7 +42,7 @@ class Lightbeam: MAX_STATUS_REASONS_TO_DISPLAY = 10 DATA_FILE_EXTENSIONS = ['json', 'jsonl', 'ndjson'] - def __init__(self, config_file, logger=None, selector="*", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes=""): + def __init__(self, config_file, logger=None, selector="*", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes="", results_file=""): self.config_file = config_file self.logger = logger self.errors = 0 @@ -58,6 +58,7 @@ def __init__(self, config_file, logger=None, selector="*", params="", wipe=False self.deleter = Deleter(self) self.api = EdFiAPI(self) self.is_locked = False + self.results_file = results_file # load params and/or env vars for config YAML interpolation self.params = json.loads(params) if params else {} diff --git a/lightbeam/send.py b/lightbeam/send.py index 316a984..bbbca23 100644 --- a/lightbeam/send.py +++ b/lightbeam/send.py @@ -1,6 +1,8 @@ import os import time +import json import asyncio +import datetime from lightbeam import util from lightbeam import hashlog @@ -13,9 +15,22 @@ def __init__(self, lightbeam=None): self.lightbeam.reset_counters() self.logger = self.lightbeam.logger self.hashlog_data = {} + self.start_timestamp = datetime.datetime.now() # Sends all (selected) endpoints def send(self): + + # Initialize a dictionary for tracking run metadata (for structured output) + self.metadata = { + "started_at": self.start_timestamp.isoformat(timespec='microseconds'), + "working_dir": os.getcwd(), + "config_file": self.lightbeam.config_file, + "data_dir": self.lightbeam.config["data_dir"], + "api_url": self.lightbeam.config["edfi_api"]["base_url"], + "namespace": self.lightbeam.config["namespace"], + "resources": {} + } + # get token with which to send requests self.lightbeam.api.do_oauth() @@ -26,6 +41,31 @@ def send(self): self.logger.info("finished processing endpoint {0}!".format(endpoint)) self.logger.info(" (final status counts: {0}) ".format(self.lightbeam.status_counts)) self.lightbeam.log_status_reasons() + + ### Create structured output results_file if necessary + if self.lightbeam.results_file: + self.end_timestamp = datetime.datetime.now() + self.metadata.update({"completed_at": self.end_timestamp.isoformat(timespec='microseconds')}) + self.metadata.update({"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds()}) + self.metadata.update({"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values())}) + self.metadata.update({"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values())}) + self.metadata.update({"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())}) + # total up counts by message and status + for resource in self.metadata["resources"].keys(): + if "failed_statuses" in self.metadata["resources"][resource].keys(): + for status in self.metadata["resources"][resource]["failed_statuses"].keys(): + total_num_errs = 0 + for message in self.metadata["resources"][resource]["failed_statuses"][status].keys(): + for file in self.metadata["resources"][resource]["failed_statuses"][status][message]["files"].keys(): + num_errs = len(self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"]) + self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file].update({"count": num_errs}) + self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"] = ",".join(str(x) for x in self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"]) + total_num_errs += num_errs + self.metadata["resources"][resource]["failed_statuses"][status].update({"count": total_num_errs}) + with open(self.lightbeam.results_file, 'w') as fp: + fp.write(json.dumps(self.metadata, indent=4)) + fp.close() + # Sends a single endpoint async def do_send(self, endpoint): @@ -40,6 +80,8 @@ async def do_send(self, endpoint): hashlog_file = os.path.join(self.lightbeam.config["state_dir"], f"{endpoint}.dat") self.hashlog_data = hashlog.load(hashlog_file) + self.metadata["resources"].update({endpoint: {}}) + self.lightbeam.reset_counters() # here we set up a smart retry client with exponential backoff and a connection pool async with self.lightbeam.api.get_retry_client() as client: @@ -83,7 +125,12 @@ async def do_send(self, endpoint): # any task may have updated the hashlog, so we need to re-save it out to disk if self.lightbeam.track_state: hashlog.save(hashlog_file, self.hashlog_data) - + + # update metadata counts for this endpoint + self.metadata["resources"][endpoint].update({"records_processed": total_counter}) + self.metadata["resources"][endpoint].update({"records_skipped": self.lightbeam.num_skipped}) + self.metadata["resources"][endpoint].update({"records_failed": self.lightbeam.num_errors}) + # Posts a single data payload to a single endpoint using the client async def do_post(self, endpoint, file_name, data, client, line, hash): @@ -109,10 +156,27 @@ async def do_post(self, endpoint, file_name, data, client, line, hash): # warn about errors if response.status not in [ 200, 201 ]: message = str(response.status) + ": " + util.linearize(body) + + # update run metadata... + if "failed_statuses" not in self.metadata["resources"][endpoint].keys(): + self.metadata["resources"][endpoint].update({"failed_statuses": {}}) + if response.status not in self.metadata["resources"][endpoint]["failed_statuses"].keys(): + self.metadata["resources"][endpoint]["failed_statuses"].update({response.status: {}}) + if message not in self.metadata["resources"][endpoint]["failed_statuses"][response.status].keys(): + self.metadata["resources"][endpoint]["failed_statuses"][response.status].update({message: {}}) + if "files" not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message].keys(): + self.metadata["resources"][endpoint]["failed_statuses"][response.status][message].update({"files": {}}) + if file_name not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"].keys(): + self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"].update({file_name: {}}) + if "line_numbers" not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name].keys(): + self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name].update({"line_numbers": []}) + self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name]["line_numbers"].append(line) + + # update output and counters self.lightbeam.increment_status_reason(message) - self.lightbeam.num_errors += 1 if response.status==400: raise Exception(message) + else: self.lightbeam.num_errors += 1 # update hashlog From eb388ab9fa00d00f160e0a6c8301c93de3d2e713 Mon Sep 17 00:00:00 2001 From: Tom Reitz Date: Tue, 13 Jun 2023 13:19:01 -0500 Subject: [PATCH 2/3] updates per PR feedback --- lightbeam/send.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/lightbeam/send.py b/lightbeam/send.py index bbbca23..66627d7 100644 --- a/lightbeam/send.py +++ b/lightbeam/send.py @@ -45,26 +45,29 @@ def send(self): ### Create structured output results_file if necessary if self.lightbeam.results_file: self.end_timestamp = datetime.datetime.now() - self.metadata.update({"completed_at": self.end_timestamp.isoformat(timespec='microseconds')}) - self.metadata.update({"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds()}) - self.metadata.update({"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values())}) - self.metadata.update({"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values())}) - self.metadata.update({"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())}) + self.metadata.update({ + "completed_at": self.end_timestamp.isoformat(timespec='microseconds'), + "runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(), + "total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()), + "total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()), + "total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values()) + }) # total up counts by message and status - for resource in self.metadata["resources"].keys(): - if "failed_statuses" in self.metadata["resources"][resource].keys(): - for status in self.metadata["resources"][resource]["failed_statuses"].keys(): + for resource, resource_metadata in self.metadata["resources"].items(): + if "failed_statuses" in resource_metadata.keys(): + for status, status_metadata in resource_metadata["failed_statuses"].items(): total_num_errs = 0 - for message in self.metadata["resources"][resource]["failed_statuses"][status].keys(): - for file in self.metadata["resources"][resource]["failed_statuses"][status][message]["files"].keys(): - num_errs = len(self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"]) - self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file].update({"count": num_errs}) - self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"] = ",".join(str(x) for x in self.metadata["resources"][resource]["failed_statuses"][status][message]["files"][file]["line_numbers"]) + for message, message_metadata in status_metadata.items(): + for file, file_metadata in message_metadata["files"].items(): + num_errs = len(file_metadata["line_numbers"]) + file_metadata.update({ + "count": num_errs, + "line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"]) + }) total_num_errs += num_errs - self.metadata["resources"][resource]["failed_statuses"][status].update({"count": total_num_errs}) + status_metadata.update({"count": total_num_errs}) with open(self.lightbeam.results_file, 'w') as fp: fp.write(json.dumps(self.metadata, indent=4)) - fp.close() # Sends a single endpoint @@ -127,9 +130,11 @@ async def do_send(self, endpoint): hashlog.save(hashlog_file, self.hashlog_data) # update metadata counts for this endpoint - self.metadata["resources"][endpoint].update({"records_processed": total_counter}) - self.metadata["resources"][endpoint].update({"records_skipped": self.lightbeam.num_skipped}) - self.metadata["resources"][endpoint].update({"records_failed": self.lightbeam.num_errors}) + self.metadata["resources"][endpoint].update({ + "records_processed": total_counter, + "records_skipped": self.lightbeam.num_skipped, + "records_failed": self.lightbeam.num_errors + }) # Posts a single data payload to a single endpoint using the client From 4c85b22dac6fa4692f8c85807c47eb5a24c7ef3a Mon Sep 17 00:00:00 2001 From: Tom Reitz Date: Tue, 13 Jun 2023 13:27:42 -0500 Subject: [PATCH 3/3] update per PR feedback --- lightbeam/send.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/lightbeam/send.py b/lightbeam/send.py index 66627d7..1592c84 100644 --- a/lightbeam/send.py +++ b/lightbeam/send.py @@ -163,20 +163,20 @@ async def do_post(self, endpoint, file_name, data, client, line, hash): message = str(response.status) + ": " + util.linearize(body) # update run metadata... - if "failed_statuses" not in self.metadata["resources"][endpoint].keys(): - self.metadata["resources"][endpoint].update({"failed_statuses": {}}) - if response.status not in self.metadata["resources"][endpoint]["failed_statuses"].keys(): - self.metadata["resources"][endpoint]["failed_statuses"].update({response.status: {}}) - if message not in self.metadata["resources"][endpoint]["failed_statuses"][response.status].keys(): - self.metadata["resources"][endpoint]["failed_statuses"][response.status].update({message: {}}) - if "files" not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message].keys(): - self.metadata["resources"][endpoint]["failed_statuses"][response.status][message].update({"files": {}}) - if file_name not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"].keys(): - self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"].update({file_name: {}}) - if "line_numbers" not in self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name].keys(): - self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name].update({"line_numbers": []}) - self.metadata["resources"][endpoint]["failed_statuses"][response.status][message]["files"][file_name]["line_numbers"].append(line) - + failed_statuses_dict = self.metadata["resources"][endpoint].get("failed_statuses", {}) + if response.status not in failed_statuses_dict.keys(): + failed_statuses_dict.update({response.status: {}}) + if message not in failed_statuses_dict[response.status].keys(): + failed_statuses_dict[response.status].update({message: {}}) + if "files" not in failed_statuses_dict[response.status][message].keys(): + failed_statuses_dict[response.status][message].update({"files": {}}) + if file_name not in failed_statuses_dict[response.status][message]["files"].keys(): + failed_statuses_dict[response.status][message]["files"].update({file_name: {}}) + if "line_numbers" not in failed_statuses_dict[response.status][message]["files"][file_name].keys(): + failed_statuses_dict[response.status][message]["files"][file_name].update({"line_numbers": []}) + failed_statuses_dict[response.status][message]["files"][file_name]["line_numbers"].append(line) + self.metadata["resources"][endpoint]["failed_statuses"] = failed_statuses_dict + # update output and counters self.lightbeam.increment_status_reason(message) if response.status==400: