Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix how hashlog entries are handled during deletes #34

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### Unreleased
* bugfix: Fix how hashlog entries are removed during `lightbeam delete`

### v0.1.2
<details>
<summary>Released 2024-04-19</summary>
Expand Down
8 changes: 4 additions & 4 deletions lightbeam/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def load_swagger_docs(self):
if endpoint_type=="descriptors" or endpoint_type=="resources":
swagger_url = endpoint["endpointUri"]
if self.lightbeam.track_state:
hash = hashlog.get_hash_string(swagger_url)
file = os.path.join(cache_dir, f"swagger-{endpoint_type}-{hash}.json")
url_hash = hashlog.get_hash_string(swagger_url)
file = os.path.join(cache_dir, f"swagger-{endpoint_type}-{url_hash}.json")
if (
self.lightbeam.track_state # we have a state_dir in which to store
and not self.lightbeam.wipe # we aren't clearing the cache
Expand Down Expand Up @@ -234,8 +234,8 @@ async def load_descriptors_values(self):
os.mkdir(cache_dir)

# check for cached descriptor values
hash = hashlog.get_hash_string(self.config["base_url"])
cache_file = os.path.join(cache_dir, f"descriptor-values-{hash}.csv")
url_hash = hashlog.get_hash_string(self.config["base_url"])
cache_file = os.path.join(cache_dir, f"descriptor-values-{url_hash}.csv")

self.lightbeam.reset_counters()
if (
Expand Down
26 changes: 13 additions & 13 deletions lightbeam/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,13 @@ async def do_deletes(self, endpoint):
params = util.interpolate_params(params_structure, data)

# check if we've posted this data before
hash = hashlog.get_hash(data)
if self.lightbeam.track_state and hash in self.hashlog_data.keys():
data_hash = hashlog.get_hash(data)
if self.lightbeam.track_state and data_hash in self.hashlog_data.keys():
# check if the last post meets criteria for a delete
if self.lightbeam.meets_process_criteria(self.hashlog_data[hash]):
if self.lightbeam.meets_process_criteria(self.hashlog_data[data_hash]):
# yes, we need to delete it; append to task queue
tasks.append(asyncio.create_task(
self.do_delete(endpoint, file_name, params, counter)))

# remove the payload from the hashlog
del self.hashlog_data[hash]
self.do_delete(endpoint, file_name, params, counter, data_hash)))
else:
# no, do not delete
self.lightbeam.num_skipped += 1
Expand Down Expand Up @@ -128,7 +125,7 @@ async def do_deletes(self, endpoint):
hashlog.save(hashlog_file, self.hashlog_data)

# Deletes a single payload for a single endpoint
async def do_delete(self, endpoint, file_name, params, line):
async def do_delete(self, endpoint, file_name, params, line, data_hash=None):
curr_token_version = int(str(self.lightbeam.token_version))
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
try:
Expand Down Expand Up @@ -163,7 +160,7 @@ async def do_delete(self, endpoint, file_name, params, line):
if type(j)==list and len(j)==1:
the_id = j[0]['id']
# now we can delete by `id`
await self.do_delete_id(endpoint, the_id, file_name, line)
await self.do_delete_id(endpoint, the_id, file_name, line, data_hash)
break

elif type(j)==list and len(j)==0: skip_reason = "payload not found in API"
Expand Down Expand Up @@ -195,7 +192,7 @@ async def do_delete(self, endpoint, file_name, params, line):
self.logger.error(" (at line {0} of {1}; ID: {2} )".format(line, file_name, id))
break

async def do_delete_id(self, endpoint, id, file_name=None, line=None):
async def do_delete_id(self, endpoint, id, file_name=None, line=None, data_hash=None):
curr_token_version = int(str(self.lightbeam.token_version))
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
try:
Expand All @@ -209,12 +206,15 @@ async def do_delete_id(self, endpoint, id, file_name=None, line=None):
if status!=401:
self.lightbeam.num_finished += 1
self.lightbeam.increment_status_counts(status)
if self.lightbeam.track_state:
del self.hashlog_data[hash]
if status not in [ 204 ]:
message = str(status) + ": " + util.linearize(body)
self.lightbeam.increment_status_reason(message)
self.lightbeam.num_errors += 1
else:
if self.lightbeam.track_state and data_hash is not None:
# if we're certain delete was successful, remove this
# line of data from internal tracking
del self.hashlog_data[data_hash]
break # (out of while loop)
else:
# this could be broken out to a separate function call,
Expand All @@ -233,4 +233,4 @@ async def do_delete_id(self, endpoint, id, file_name=None, line=None):
self.logger.exception(e, exc_info=self.lightbeam.config["show_stacktrace"])
if line and file_name:
self.logger.error(" (at line {0} of {1}; ID: {2} )".format(line, file_name, id))
break
break
83 changes: 51 additions & 32 deletions lightbeam/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, lightbeam=None):
self.logger = self.lightbeam.logger
self.hashlog_data = {}
self.start_timestamp = datetime.datetime.now()

# Sends all (selected) endpoints
def send(self):

Expand All @@ -39,15 +39,15 @@ def send(self):
endpoints = self.lightbeam.get_endpoints_with_data(self.lightbeam.endpoints)
if len(endpoints)==0:
self.logger.critical("`data_dir` {0} has no *.jsonl files".format(self.lightbeam.config["data_dir"]) + " for selected endpoints")

# send each endpoint
for endpoint in endpoints:
self.logger.info("sending endpoint {0} ...".format(endpoint))
asyncio.run(self.do_send(endpoint))
self.logger.info("finished processing endpoint {0}!".format(endpoint))
self.logger.info(" (final status counts: {0}) ".format(self.lightbeam.status_counts))
self.lightbeam.log_status_reasons()

### Create structured output results_file if necessary
self.end_timestamp = datetime.datetime.now()
self.metadata.update({
Expand All @@ -62,18 +62,17 @@ def send(self):
if "failures" in self.metadata["resources"][resource].keys():
for idx, _ in enumerate(self.metadata["resources"][resource]["failures"]):
self.metadata["resources"][resource]["failures"][idx]["line_numbers"].sort()



# helper function used below
def repl(m):
return re.sub(r"\s+", '', m.group(0))

### Create structured output results_file if necessary
if self.lightbeam.results_file:

# create directory if not exists
os.makedirs(os.path.dirname(self.lightbeam.results_file), exist_ok=True)

with open(self.lightbeam.results_file, 'w') as fp:
content = json.dumps(self.metadata, indent=4)
# failures.line_numbers are split each on their own line; here we remove those line breaks
Expand All @@ -83,12 +82,11 @@ def repl(m):
if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]:
self.logger.info("all payloads skipped")
exit(99) # signal to downstream tasks (in Airflow) all payloads skipped

if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]:
self.logger.info("all payloads failed")
exit(1) # signal to downstream tasks (in Airflow) all payloads failed


# Sends a single endpoint
async def do_send(self, endpoint):
# We try to avoid re-POSTing JSON we've already (successfully) sent.
Expand All @@ -101,7 +99,7 @@ async def do_send(self, endpoint):
if self.lightbeam.track_state:
hashlog_file = os.path.join(self.lightbeam.config["state_dir"], f"{endpoint}.dat")
self.hashlog_data = hashlog.load(hashlog_file)

self.metadata["resources"].update({endpoint: {}})
self.lightbeam.reset_counters()

Expand All @@ -116,45 +114,63 @@ async def do_send(self, endpoint):
total_counter += 1
data = line.strip()
# compute hash of current row
hash = hashlog.get_hash(data)
data_hash = hashlog.get_hash(data)
# check if we've posted this data before
if self.lightbeam.track_state and hash in self.hashlog_data.keys():
if (
self.lightbeam.track_state
and data_hash in self.hashlog_data.keys()
):
# check if the last post meets criteria for a resend
if self.lightbeam.meets_process_criteria(self.hashlog_data[hash]):
if self.lightbeam.meets_process_criteria(
self.hashlog_data[data_hash]
):
# yes, we need to (re)post it; append to task queue
tasks.append(asyncio.create_task(
self.do_post(endpoint, file_name, data, line_counter, hash)))
tasks.append(
asyncio.create_task(
self.do_post(
endpoint,
file_name,
data,
line_counter,
data_hash,
)
)
)
else:
# no, do not (re)post
self.lightbeam.num_skipped += 1
continue
else:
# new, never-before-seen payload! append it to task queue
tasks.append(asyncio.create_task(
self.do_post(endpoint, file_name, data, line_counter, hash)))

tasks.append(
asyncio.create_task(
self.do_post(
endpoint, file_name, data, line_counter, data_hash
)
)
)

if total_counter%self.lightbeam.MAX_TASK_QUEUE_SIZE==0:
await self.lightbeam.do_tasks(tasks, total_counter)
tasks = []

if self.lightbeam.num_skipped>0:
self.logger.info("skipped {0} of {1} payloads because they were previously processed and did not match any resend criteria".format(self.lightbeam.num_skipped, total_counter))
if len(tasks)>0: await self.lightbeam.do_tasks(tasks, total_counter)

# any task may have updated the hashlog, so we need to re-save it out to disk
if self.lightbeam.track_state:
hashlog.save(hashlog_file, self.hashlog_data)

# update metadata counts for this endpoint
self.metadata["resources"][endpoint].update({
"records_processed": total_counter,
"records_skipped": self.lightbeam.num_skipped,
"records_failed": self.lightbeam.num_errors
})



# Posts a single data payload to a single endpoint
async def do_post(self, endpoint, file_name, data, line, hash):
async def do_post(self, endpoint, file_name, data, line, data_hash):
curr_token_version = int(str(self.lightbeam.token_version))
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
try:
Expand All @@ -170,7 +186,7 @@ async def do_post(self, endpoint, file_name, data, line, hash):
# update status_counts (for every-second status update)
self.lightbeam.increment_status_counts(status)
self.lightbeam.num_finished += 1

# warn about errors
if response.status not in [ 200, 201 ]:
message = str(response.status) + ": " + util.linearize(json.loads(body).get("message"))
Expand Down Expand Up @@ -198,12 +214,16 @@ async def do_post(self, endpoint, file_name, data, line, hash):
self.lightbeam.increment_status_reason(message)
if response.status==400:
raise Exception(message)
else: self.lightbeam.num_errors += 1

else:
self.lightbeam.num_errors += 1

# update hashlog
if self.lightbeam.track_state:
self.hashlog_data[hash] = (round(time.time()), response.status)

self.hashlog_data[data_hash] = (
round(time.time()),
response.status,
)

break # (out of while loop)

else: # 401 status
Expand All @@ -216,12 +236,11 @@ async def do_post(self, endpoint, file_name, data, line, hash):
else:
await asyncio.sleep(1)
curr_token_version = int(str(self.lightbeam.token_version))

except RuntimeError as e:
await asyncio.sleep(1)
except Exception as e:
status = 400
self.lightbeam.num_errors += 1
self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line, file_name))
break

8 changes: 4 additions & 4 deletions lightbeam/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ def validate_endpoint(self, swagger, endpoint, local_descriptors=[]):

# check natural keys are unique
params = json.dumps(util.interpolate_params(params_structure, line))
hash = hashlog.get_hash(params)
if hash in distinct_params:
params_hash = hashlog.get_hash(params)
if params_hash in distinct_params:
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
self.logger.warning(f"... VALIDATION ERROR (line {counter}): duplicate value(s) for natural key(s): {params}")
self.lightbeam.num_errors += 1
continue
else: distinct_params.append(hash)
else: distinct_params.append(params_hash)

if self.lightbeam.num_errors==0: self.logger.info(f"... all lines validate ok!")
else:
Expand Down Expand Up @@ -137,4 +137,4 @@ def is_valid_descriptor_value(self, namespace, codeValue):
if row[1]==namespace and row[2]==codeValue:
return True
return False