diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a19645..4837c38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +### Unreleased +* bugfix: Fix how hashlog entries are removed during `lightbeam delete` + ### v0.1.2
Released 2024-04-19 diff --git a/lightbeam/api.py b/lightbeam/api.py index 1092865..d12bb05 100644 --- a/lightbeam/api.py +++ b/lightbeam/api.py @@ -187,8 +187,8 @@ def load_swagger_docs(self): if endpoint_type=="descriptors" or endpoint_type=="resources": swagger_url = endpoint["endpointUri"] if self.lightbeam.track_state: - hash = hashlog.get_hash_string(swagger_url) - file = os.path.join(cache_dir, f"swagger-{endpoint_type}-{hash}.json") + url_hash = hashlog.get_hash_string(swagger_url) + file = os.path.join(cache_dir, f"swagger-{endpoint_type}-{url_hash}.json") if ( self.lightbeam.track_state # we have a state_dir in which to store and not self.lightbeam.wipe # we aren't clearing the cache @@ -234,8 +234,8 @@ async def load_descriptors_values(self): os.mkdir(cache_dir) # check for cached descriptor values - hash = hashlog.get_hash_string(self.config["base_url"]) - cache_file = os.path.join(cache_dir, f"descriptor-values-{hash}.csv") + url_hash = hashlog.get_hash_string(self.config["base_url"]) + cache_file = os.path.join(cache_dir, f"descriptor-values-{url_hash}.csv") self.lightbeam.reset_counters() if ( diff --git a/lightbeam/delete.py b/lightbeam/delete.py index bf76c4c..e0f1161 100644 --- a/lightbeam/delete.py +++ b/lightbeam/delete.py @@ -91,16 +91,13 @@ async def do_deletes(self, endpoint): params = util.interpolate_params(params_structure, data) # check if we've posted this data before - hash = hashlog.get_hash(data) - if self.lightbeam.track_state and hash in self.hashlog_data.keys(): + data_hash = hashlog.get_hash(data) + if self.lightbeam.track_state and data_hash in self.hashlog_data.keys(): # check if the last post meets criteria for a delete - if self.lightbeam.meets_process_criteria(self.hashlog_data[hash]): + if self.lightbeam.meets_process_criteria(self.hashlog_data[data_hash]): # yes, we need to delete it; append to task queue tasks.append(asyncio.create_task( - self.do_delete(endpoint, file_name, params, counter))) - - # remove the payload from the hashlog - del self.hashlog_data[hash] + self.do_delete(endpoint, file_name, params, counter, data_hash))) else: # no, do not delete self.lightbeam.num_skipped += 1 @@ -128,7 +125,7 @@ async def do_deletes(self, endpoint): hashlog.save(hashlog_file, self.hashlog_data) # Deletes a single payload for a single endpoint - async def do_delete(self, endpoint, file_name, params, line): + async def do_delete(self, endpoint, file_name, params, line, data_hash=None): curr_token_version = int(str(self.lightbeam.token_version)) while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop try: @@ -163,7 +160,7 @@ async def do_delete(self, endpoint, file_name, params, line): if type(j)==list and len(j)==1: the_id = j[0]['id'] # now we can delete by `id` - await self.do_delete_id(endpoint, the_id, file_name, line) + await self.do_delete_id(endpoint, the_id, file_name, line, data_hash) break elif type(j)==list and len(j)==0: skip_reason = "payload not found in API" @@ -195,7 +192,7 @@ async def do_delete(self, endpoint, file_name, params, line): self.logger.error(" (at line {0} of {1}; ID: {2} )".format(line, file_name, id)) break - async def do_delete_id(self, endpoint, id, file_name=None, line=None): + async def do_delete_id(self, endpoint, id, file_name=None, line=None, data_hash=None): curr_token_version = int(str(self.lightbeam.token_version)) while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop try: @@ -209,12 +206,15 @@ async def do_delete_id(self, endpoint, id, file_name=None, line=None): if status!=401: self.lightbeam.num_finished += 1 self.lightbeam.increment_status_counts(status) - if self.lightbeam.track_state: - del self.hashlog_data[hash] if status not in [ 204 ]: message = str(status) + ": " + util.linearize(body) self.lightbeam.increment_status_reason(message) self.lightbeam.num_errors += 1 + else: + if self.lightbeam.track_state and data_hash is not None: + # if we're certain delete was successful, remove this + # line of data from internal tracking + del self.hashlog_data[data_hash] break # (out of while loop) else: # this could be broken out to a separate function call, @@ -233,4 +233,4 @@ async def do_delete_id(self, endpoint, id, file_name=None, line=None): self.logger.exception(e, exc_info=self.lightbeam.config["show_stacktrace"]) if line and file_name: self.logger.error(" (at line {0} of {1}; ID: {2} )".format(line, file_name, id)) - break \ No newline at end of file + break diff --git a/lightbeam/send.py b/lightbeam/send.py index bc91a9a..7577c4c 100644 --- a/lightbeam/send.py +++ b/lightbeam/send.py @@ -17,7 +17,7 @@ def __init__(self, lightbeam=None): self.logger = self.lightbeam.logger self.hashlog_data = {} self.start_timestamp = datetime.datetime.now() - + # Sends all (selected) endpoints def send(self): @@ -39,7 +39,7 @@ def send(self): endpoints = self.lightbeam.get_endpoints_with_data(self.lightbeam.endpoints) if len(endpoints)==0: self.logger.critical("`data_dir` {0} has no *.jsonl files".format(self.lightbeam.config["data_dir"]) + " for selected endpoints") - + # send each endpoint for endpoint in endpoints: self.logger.info("sending endpoint {0} ...".format(endpoint)) @@ -47,7 +47,7 @@ def send(self): self.logger.info("finished processing endpoint {0}!".format(endpoint)) self.logger.info(" (final status counts: {0}) ".format(self.lightbeam.status_counts)) self.lightbeam.log_status_reasons() - + ### Create structured output results_file if necessary self.end_timestamp = datetime.datetime.now() self.metadata.update({ @@ -62,18 +62,17 @@ def send(self): if "failures" in self.metadata["resources"][resource].keys(): for idx, _ in enumerate(self.metadata["resources"][resource]["failures"]): self.metadata["resources"][resource]["failures"][idx]["line_numbers"].sort() - - + # helper function used below def repl(m): return re.sub(r"\s+", '', m.group(0)) - + ### Create structured output results_file if necessary if self.lightbeam.results_file: - + # create directory if not exists os.makedirs(os.path.dirname(self.lightbeam.results_file), exist_ok=True) - + with open(self.lightbeam.results_file, 'w') as fp: content = json.dumps(self.metadata, indent=4) # failures.line_numbers are split each on their own line; here we remove those line breaks @@ -83,12 +82,11 @@ def repl(m): if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]: self.logger.info("all payloads skipped") exit(99) # signal to downstream tasks (in Airflow) all payloads skipped - + if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]: self.logger.info("all payloads failed") exit(1) # signal to downstream tasks (in Airflow) all payloads failed - # Sends a single endpoint async def do_send(self, endpoint): # We try to avoid re-POSTing JSON we've already (successfully) sent. @@ -101,7 +99,7 @@ async def do_send(self, endpoint): if self.lightbeam.track_state: hashlog_file = os.path.join(self.lightbeam.config["state_dir"], f"{endpoint}.dat") self.hashlog_data = hashlog.load(hashlog_file) - + self.metadata["resources"].update({endpoint: {}}) self.lightbeam.reset_counters() @@ -116,45 +114,63 @@ async def do_send(self, endpoint): total_counter += 1 data = line.strip() # compute hash of current row - hash = hashlog.get_hash(data) + data_hash = hashlog.get_hash(data) # check if we've posted this data before - if self.lightbeam.track_state and hash in self.hashlog_data.keys(): + if ( + self.lightbeam.track_state + and data_hash in self.hashlog_data.keys() + ): # check if the last post meets criteria for a resend - if self.lightbeam.meets_process_criteria(self.hashlog_data[hash]): + if self.lightbeam.meets_process_criteria( + self.hashlog_data[data_hash] + ): # yes, we need to (re)post it; append to task queue - tasks.append(asyncio.create_task( - self.do_post(endpoint, file_name, data, line_counter, hash))) + tasks.append( + asyncio.create_task( + self.do_post( + endpoint, + file_name, + data, + line_counter, + data_hash, + ) + ) + ) else: # no, do not (re)post self.lightbeam.num_skipped += 1 continue else: # new, never-before-seen payload! append it to task queue - tasks.append(asyncio.create_task( - self.do_post(endpoint, file_name, data, line_counter, hash))) - + tasks.append( + asyncio.create_task( + self.do_post( + endpoint, file_name, data, line_counter, data_hash + ) + ) + ) + if total_counter%self.lightbeam.MAX_TASK_QUEUE_SIZE==0: await self.lightbeam.do_tasks(tasks, total_counter) tasks = [] - + if self.lightbeam.num_skipped>0: self.logger.info("skipped {0} of {1} payloads because they were previously processed and did not match any resend criteria".format(self.lightbeam.num_skipped, total_counter)) if len(tasks)>0: await self.lightbeam.do_tasks(tasks, total_counter) - + # any task may have updated the hashlog, so we need to re-save it out to disk if self.lightbeam.track_state: hashlog.save(hashlog_file, self.hashlog_data) - + # update metadata counts for this endpoint self.metadata["resources"][endpoint].update({ "records_processed": total_counter, "records_skipped": self.lightbeam.num_skipped, "records_failed": self.lightbeam.num_errors }) - - + # Posts a single data payload to a single endpoint - async def do_post(self, endpoint, file_name, data, line, hash): + async def do_post(self, endpoint, file_name, data, line, data_hash): curr_token_version = int(str(self.lightbeam.token_version)) while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop try: @@ -170,7 +186,7 @@ async def do_post(self, endpoint, file_name, data, line, hash): # update status_counts (for every-second status update) self.lightbeam.increment_status_counts(status) self.lightbeam.num_finished += 1 - + # warn about errors if response.status not in [ 200, 201 ]: message = str(response.status) + ": " + util.linearize(json.loads(body).get("message")) @@ -198,12 +214,16 @@ async def do_post(self, endpoint, file_name, data, line, hash): self.lightbeam.increment_status_reason(message) if response.status==400: raise Exception(message) - else: self.lightbeam.num_errors += 1 - + else: + self.lightbeam.num_errors += 1 + # update hashlog if self.lightbeam.track_state: - self.hashlog_data[hash] = (round(time.time()), response.status) - + self.hashlog_data[data_hash] = ( + round(time.time()), + response.status, + ) + break # (out of while loop) else: # 401 status @@ -216,7 +236,7 @@ async def do_post(self, endpoint, file_name, data, line, hash): else: await asyncio.sleep(1) curr_token_version = int(str(self.lightbeam.token_version)) - + except RuntimeError as e: await asyncio.sleep(1) except Exception as e: @@ -224,4 +244,3 @@ async def do_post(self, endpoint, file_name, data, line, hash): self.lightbeam.num_errors += 1 self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line, file_name)) break - diff --git a/lightbeam/validate.py b/lightbeam/validate.py index b6ffb7f..1d6bbab 100644 --- a/lightbeam/validate.py +++ b/lightbeam/validate.py @@ -82,13 +82,13 @@ def validate_endpoint(self, swagger, endpoint, local_descriptors=[]): # check natural keys are unique params = json.dumps(util.interpolate_params(params_structure, line)) - hash = hashlog.get_hash(params) - if hash in distinct_params: + params_hash = hashlog.get_hash(params) + if params_hash in distinct_params: if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY: self.logger.warning(f"... VALIDATION ERROR (line {counter}): duplicate value(s) for natural key(s): {params}") self.lightbeam.num_errors += 1 continue - else: distinct_params.append(hash) + else: distinct_params.append(params_hash) if self.lightbeam.num_errors==0: self.logger.info(f"... all lines validate ok!") else: @@ -137,4 +137,4 @@ def is_valid_descriptor_value(self, namespace, codeValue): if row[1]==namespace and row[2]==codeValue: return True return False - \ No newline at end of file +