diff --git a/lightbeam/send.py b/lightbeam/send.py index 9668149..47c8808 100644 --- a/lightbeam/send.py +++ b/lightbeam/send.py @@ -43,32 +43,41 @@ def send(self): self.lightbeam.log_status_reasons() ### Create structured output results_file if necessary + self.end_timestamp = datetime.datetime.now() + self.metadata.update({ + "completed_at": self.end_timestamp.isoformat(timespec='microseconds'), + "runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(), + "total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()), + "total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()), + "total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values()) + }) + # total up counts by message and status + for resource, resource_metadata in self.metadata["resources"].items(): + if "failed_statuses" in resource_metadata.keys(): + for status, status_metadata in resource_metadata["failed_statuses"].items(): + total_num_errs = 0 + for message, message_metadata in status_metadata.items(): + for file, file_metadata in message_metadata["files"].items(): + num_errs = len(file_metadata["line_numbers"]) + file_metadata.update({ + "count": num_errs, + "line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"]) + }) + total_num_errs += num_errs + status_metadata.update({"count": total_num_errs}) + if self.lightbeam.results_file: - self.end_timestamp = datetime.datetime.now() - self.metadata.update({ - "completed_at": self.end_timestamp.isoformat(timespec='microseconds'), - "runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(), - "total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()), - "total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()), - "total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values()) - }) - # total up counts by message and status - for resource, resource_metadata in self.metadata["resources"].items(): - if "failed_statuses" in resource_metadata.keys(): - for status, status_metadata in resource_metadata["failed_statuses"].items(): - total_num_errs = 0 - for message, message_metadata in status_metadata.items(): - for file, file_metadata in message_metadata["files"].items(): - num_errs = len(file_metadata["line_numbers"]) - file_metadata.update({ - "count": num_errs, - "line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"]) - }) - total_num_errs += num_errs - status_metadata.update({"count": total_num_errs}) with open(self.lightbeam.results_file, 'w') as fp: fp.write(json.dumps(self.metadata, indent=4)) + if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]: + self.logger.info("all payloads skipped") + exit(99) # signal to downstream tasks (in Airflow) all payloads skipped + + if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]: + self.logger.info("all payloads failed") + exit(1) # signal to downstream tasks (in Airflow) all payloads failed + # Sends a single endpoint async def do_send(self, endpoint):