Skip to content

Commit

Permalink
Merge pull request #12 from edanalytics/feature/nonzero_exit_code_for…
Browse files Browse the repository at this point in the history
…_airflow_skip

adding skip exit code for lightbeam
  • Loading branch information
tomreitz authored Jun 13, 2023
2 parents 15de2a0 + 68c19b8 commit 8dcf182
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions lightbeam/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,41 @@ def send(self):
self.lightbeam.log_status_reasons()

### Create structured output results_file if necessary
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
})
# total up counts by message and status
for resource, resource_metadata in self.metadata["resources"].items():
if "failed_statuses" in resource_metadata.keys():
for status, status_metadata in resource_metadata["failed_statuses"].items():
total_num_errs = 0
for message, message_metadata in status_metadata.items():
for file, file_metadata in message_metadata["files"].items():
num_errs = len(file_metadata["line_numbers"])
file_metadata.update({
"count": num_errs,
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
})
total_num_errs += num_errs
status_metadata.update({"count": total_num_errs})

if self.lightbeam.results_file:
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
})
# total up counts by message and status
for resource, resource_metadata in self.metadata["resources"].items():
if "failed_statuses" in resource_metadata.keys():
for status, status_metadata in resource_metadata["failed_statuses"].items():
total_num_errs = 0
for message, message_metadata in status_metadata.items():
for file, file_metadata in message_metadata["files"].items():
num_errs = len(file_metadata["line_numbers"])
file_metadata.update({
"count": num_errs,
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
})
total_num_errs += num_errs
status_metadata.update({"count": total_num_errs})
with open(self.lightbeam.results_file, 'w') as fp:
fp.write(json.dumps(self.metadata, indent=4))

if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]:
self.logger.info("all payloads skipped")
exit(99) # signal to downstream tasks (in Airflow) all payloads skipped

if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]:
self.logger.info("all payloads failed")
exit(1) # signal to downstream tasks (in Airflow) all payloads failed


# Sends a single endpoint
async def do_send(self, endpoint):
Expand Down

0 comments on commit 8dcf182

Please sign in to comment.