Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding structured output option #10

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,68 @@ lightbeam send -c path/to/config.yaml -w
lightbeam send -c path/to/config.yaml --wipe
```

## Structured output of run results
To produce a JSON file with metadata about the run, invoke lightbeam with
```bash
lightbeam send -c path/to/config.yaml --results-file ./results.json
```
A sample results file could be:

```json
{
"started_at": "2023-06-08T17:18:25.053207",
"working_dir": "/home/someuser/code/sandbox/testing_lightbeam",
"config_file": "lightbeam.yml",
"data_dir": "./",
"api_url": "https://some-ed-fi-api.edu/api",
"namespace": "ed-fi",
"resources": {
"studentSchoolAssociations": {
"failed_statuses": {
"400": {
"400: { \"message\": \"The request is invalid.\", \"modelState\": { \"request.schoolReference.schoolId\": [ \"JSON integer 1234567899999 is too large or small for an Int32. Path 'schoolReference.schoolId', line 1, position 328.\" ] } }": {
"files": {
"./studentSchoolAssociations.jsonl": {
"line_numbers": "6,4,5,7,8",
"count": 5
}
}
},
"400: { \"message\": \"Validation of 'StudentSchoolAssociation' failed.\\n\\tStudent reference could not be resolved.\\n\" }": {
"files": {
"./studentSchoolAssociations.jsonl": {
"line_numbers": "1,3,2",
"count": 3
}
}
},
"count": 8
},
"409": {
"409: { \"message\": \"The value supplied for the related 'studentschoolassociation' resource does not exist.\" }": {
"files": {
"./studentSchoolAssociations.jsonl": {
"line_numbers": "9,10,12,14,16,13,11,15,17,18,19,21,22,20",
"count": 14
}
}
},
"count": 14
}
},
"records_processed": 22,
"records_skipped": 0,
"records_failed": 22
}
},
"completed_at": "2023-06-08T17:18:26.724699",
"runtime_sec": 1.671492,
"total_records_processed": 22,
"total_records_skipped": 0,
"total_records_failed": 22
}
```


# Design
Some details of the design of this tool are discussed below.
Expand Down
10 changes: 8 additions & 2 deletions lightbeam/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ def main(argv=None):
type=str,
help='only payloads that returned one of these comma-delimited HTTP status codes on last send will be processed'
)
defaults = { "selector":"*", "params": "", "older_than": "", "newer_than": "", "resend_status_codes": "" }
parser.add_argument("--results-file",
type=str,
help='produces a JSON output file with structured information about run results'
)

defaults = { "selector":"*", "params": "", "older_than": "", "newer_than": "", "resend_status_codes": "", "results_file": "" }
parser.set_defaults(**defaults)
args, remaining_argv = parser.parse_known_args()

Expand Down Expand Up @@ -110,7 +115,8 @@ def main(argv=None):
force=args.force,
older_than=args.older_than,
newer_than=args.newer_than,
resend_status_codes=args.resend_status_codes
resend_status_codes=args.resend_status_codes,
results_file=args.results_file
)
try:
logger.info("starting...")
Expand Down
3 changes: 2 additions & 1 deletion lightbeam/lightbeam.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Lightbeam:
MAX_STATUS_REASONS_TO_DISPLAY = 10
DATA_FILE_EXTENSIONS = ['json', 'jsonl', 'ndjson']

def __init__(self, config_file, logger=None, selector="*", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes=""):
def __init__(self, config_file, logger=None, selector="*", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes="", results_file=""):
self.config_file = config_file
self.logger = logger
self.errors = 0
Expand All @@ -58,6 +58,7 @@ def __init__(self, config_file, logger=None, selector="*", params="", wipe=False
self.deleter = Deleter(self)
self.api = EdFiAPI(self)
self.is_locked = False
self.results_file = results_file

# load params and/or env vars for config YAML interpolation
self.params = json.loads(params) if params else {}
Expand Down
73 changes: 71 additions & 2 deletions lightbeam/send.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import time
import json
import asyncio
import datetime

from lightbeam import util
from lightbeam import hashlog
Expand All @@ -13,9 +15,22 @@ def __init__(self, lightbeam=None):
self.lightbeam.reset_counters()
self.logger = self.lightbeam.logger
self.hashlog_data = {}
self.start_timestamp = datetime.datetime.now()

# Sends all (selected) endpoints
def send(self):

# Initialize a dictionary for tracking run metadata (for structured output)
self.metadata = {
"started_at": self.start_timestamp.isoformat(timespec='microseconds'),
"working_dir": os.getcwd(),
"config_file": self.lightbeam.config_file,
"data_dir": self.lightbeam.config["data_dir"],
"api_url": self.lightbeam.config["edfi_api"]["base_url"],
"namespace": self.lightbeam.config["namespace"],
"resources": {}
}

# get token with which to send requests
self.lightbeam.api.do_oauth()

Expand All @@ -26,6 +41,34 @@ def send(self):
self.logger.info("finished processing endpoint {0}!".format(endpoint))
self.logger.info(" (final status counts: {0}) ".format(self.lightbeam.status_counts))
self.lightbeam.log_status_reasons()

### Create structured output results_file if necessary
if self.lightbeam.results_file:
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
})
# total up counts by message and status
for resource, resource_metadata in self.metadata["resources"].items():
if "failed_statuses" in resource_metadata.keys():
for status, status_metadata in resource_metadata["failed_statuses"].items():
total_num_errs = 0
for message, message_metadata in status_metadata.items():
for file, file_metadata in message_metadata["files"].items():
num_errs = len(file_metadata["line_numbers"])
file_metadata.update({
"count": num_errs,
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
})
total_num_errs += num_errs
status_metadata.update({"count": total_num_errs})
with open(self.lightbeam.results_file, 'w') as fp:
fp.write(json.dumps(self.metadata, indent=4))


# Sends a single endpoint
async def do_send(self, endpoint):
Expand All @@ -40,6 +83,8 @@ async def do_send(self, endpoint):
hashlog_file = os.path.join(self.lightbeam.config["state_dir"], f"{endpoint}.dat")
self.hashlog_data = hashlog.load(hashlog_file)

self.metadata["resources"].update({endpoint: {}})

self.lightbeam.reset_counters()
# here we set up a smart retry client with exponential backoff and a connection pool
async with self.lightbeam.api.get_retry_client() as client:
Expand Down Expand Up @@ -83,7 +128,14 @@ async def do_send(self, endpoint):
# any task may have updated the hashlog, so we need to re-save it out to disk
if self.lightbeam.track_state:
hashlog.save(hashlog_file, self.hashlog_data)


# update metadata counts for this endpoint
self.metadata["resources"][endpoint].update({
"records_processed": total_counter,
"records_skipped": self.lightbeam.num_skipped,
"records_failed": self.lightbeam.num_errors
})


# Posts a single data payload to a single endpoint using the client
async def do_post(self, endpoint, file_name, data, client, line, hash):
Expand All @@ -109,10 +161,27 @@ async def do_post(self, endpoint, file_name, data, client, line, hash):
# warn about errors
if response.status not in [ 200, 201 ]:
message = str(response.status) + ": " + util.linearize(body)

# update run metadata...
failed_statuses_dict = self.metadata["resources"][endpoint].get("failed_statuses", {})
if response.status not in failed_statuses_dict.keys():
failed_statuses_dict.update({response.status: {}})
if message not in failed_statuses_dict[response.status].keys():
failed_statuses_dict[response.status].update({message: {}})
if "files" not in failed_statuses_dict[response.status][message].keys():
failed_statuses_dict[response.status][message].update({"files": {}})
if file_name not in failed_statuses_dict[response.status][message]["files"].keys():
failed_statuses_dict[response.status][message]["files"].update({file_name: {}})
if "line_numbers" not in failed_statuses_dict[response.status][message]["files"][file_name].keys():
failed_statuses_dict[response.status][message]["files"][file_name].update({"line_numbers": []})
failed_statuses_dict[response.status][message]["files"][file_name]["line_numbers"].append(line)
self.metadata["resources"][endpoint]["failed_statuses"] = failed_statuses_dict

# update output and counters
self.lightbeam.increment_status_reason(message)
self.lightbeam.num_errors += 1
if response.status==400:
raise Exception(message)
else: self.lightbeam.num_errors += 1


# update hashlog
Expand Down