Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding skip exit code for lightbeam #12

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions lightbeam/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,41 @@ def send(self):
self.lightbeam.log_status_reasons()

### Create structured output results_file if necessary
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
})
# total up counts by message and status
for resource, resource_metadata in self.metadata["resources"].items():
if "failed_statuses" in resource_metadata.keys():
for status, status_metadata in resource_metadata["failed_statuses"].items():
total_num_errs = 0
for message, message_metadata in status_metadata.items():
for file, file_metadata in message_metadata["files"].items():
num_errs = len(file_metadata["line_numbers"])
file_metadata.update({
"count": num_errs,
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
})
total_num_errs += num_errs
status_metadata.update({"count": total_num_errs})

if self.lightbeam.results_file:
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
})
# total up counts by message and status
for resource, resource_metadata in self.metadata["resources"].items():
if "failed_statuses" in resource_metadata.keys():
for status, status_metadata in resource_metadata["failed_statuses"].items():
total_num_errs = 0
for message, message_metadata in status_metadata.items():
for file, file_metadata in message_metadata["files"].items():
num_errs = len(file_metadata["line_numbers"])
file_metadata.update({
"count": num_errs,
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
})
total_num_errs += num_errs
status_metadata.update({"count": total_num_errs})
with open(self.lightbeam.results_file, 'w') as fp:
fp.write(json.dumps(self.metadata, indent=4))

if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will work, assuming that rows are added to both lists at the same time (and hence are in the same order).

self.logger.info("all payloads skipped")
exit(99) # signal to downstream tasks (in Airflow) all payloads skipped

if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]:
self.logger.info("all payloads failed")
exit(1) # signal to downstream tasks (in Airflow) all payloads failed


# Sends a single endpoint
async def do_send(self, endpoint):
Expand Down