Skip to content

Commit

Permalink
Issue 30.ant.1 (#31)
Browse files Browse the repository at this point in the history
* Update file-loader.py

#30

* Update CHANGELOG.md

#30
  • Loading branch information
antaenc authored Feb 20, 2023
1 parent fc56e53 commit afd5817
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[markdownlint](https://dlaa.me/markdownlint/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.1] - 2023-02-20

### Fixed in 1.2.1

- Fixed draining of threads for Postgres governor condition

## [1.2.0] - 2023-01-26

### Fixed in 1.2.0
Expand Down
96 changes: 74 additions & 22 deletions file-loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import json

__all__ = []
__version__ = '1.2.0' # See https://www.python.org/dev/peps/pep-0396/
__version__ = '1.2.1' # See https://www.python.org/dev/peps/pep-0396/
__date__ = '2022-11-29'
__updated__ = '2023-01-26'
__updated__ = '2023-02-20'


# Custom actions for argparse. Enables checking if an arg "was specified" on the CLI to check if CLI args should take
Expand Down Expand Up @@ -230,16 +230,21 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
start_time = long_check_time = work_stats_time = prev_time = time.time()
success_recs = error_recs = load_records = redo_records = redo_error_recs = redo_success_recs = in_file_count = 0
add_future = True
in_file_eof = no_redo_record = False

# Test the max number of workers ThreadPoolExecutor allocates to use in sizing actual workers to request
with concurrent.futures.ThreadPoolExecutor() as test:
test_max_workers = test._max_workers

# Test number of lines in input file to size max workers, input file could be smaller than calculated max workers
for _ in (True,):
with open(file_input, 'r') as in_file:
for _ in in_file:
in_file_count += 1
# or num_workers if specified with --numThreads
with open(file_input, 'r') as in_file:
for _ in in_file:
in_file_count += 1
if num_workers:
if in_file_count == num_workers:
break
else:
if in_file_count == test_max_workers:
break

Expand All @@ -257,7 +262,7 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
futures = {loader.submit(add_record, engine, record, with_info): (record.strip(), time.time()) for record in itertools.islice(in_file, loader._max_workers)}
logger.info(f'Starting to load with {loader._max_workers} threads...')
load_records = loader._max_workers
while futures:
while True:
for f in concurrent.futures.as_completed(futures.keys()):
try:
result = f.result()
Expand All @@ -268,10 +273,13 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
logger.critical(f'Exception: {ex} - Operation: addRecord - Record: {futures[f][PAYLOAD_RECORD]}')
do_shutdown = True
else:
record = in_file.readline()
if record and add_future and not do_shutdown:
load_records += 1
futures[loader.submit(add_record, engine, record.strip(), with_info)] = (record.strip(), time.time())
if add_future and not do_shutdown:
record = in_file.readline()
if record:
load_records += 1
futures[loader.submit(add_record, engine, record.strip(), with_info)] = (record.strip(), time.time())
else:
in_file_eof = True

if result:
out_file.write(result + '\n')
Expand All @@ -281,18 +289,39 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
prev_time = record_stats(success_recs, error_recs, prev_time, 'adds')
finally:
futures.pop(f)

if call_governor:
##
print(f'{len(futures) = }')
if do_shutdown and len(futures) == 0 or \
in_file_eof and len(futures) == 0:
break

# Only used for Postgres
if call_governor and not do_shutdown:
gov_pause_secs = gov.govern()
# -1 returned, halt all processing due to transaction ID age (XID) high watermark
# Postgres vacuum required
if gov_pause_secs < 0:
time.sleep(1)
add_future = False
continue
add_future = True

# Slow down processing
if gov_pause_secs > 0:
time.sleep(gov_pause_secs)

# If processing was halted futures would be drained, once processing can continue create
# new futures
if add_future and not in_file_eof:
while len(futures) < max_load_workers:
record = in_file.readline()
if record:
load_records += 1
futures[loader.submit(add_record, engine, record.strip(), with_info)] = (record.strip(), time.time())
else:
in_file_eof = True
break

time_now = time.time()
if time_now > work_stats_time + WORK_STATS_INTERVAL:
work_stats_time = time_now
Expand Down Expand Up @@ -324,22 +353,24 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
with concurrent.futures.ThreadPoolExecutor(max_redo_workers) as redoer:
futures = {redoer.submit(process_redo_record, engine, record, with_info): (record, time.time()) for record in get_redo_records(engine, redoer._max_workers)}
redo_records = redoer._max_workers
while futures:

while True:
for f in concurrent.futures.as_completed(futures.keys()):
try:
result = f.result()
redo_record = get_redo_records(engine, 1)
except (G2BadInputException, G2RetryableException, json.JSONDecodeError) as ex:
logger.error(f'Exception: {ex} - Operation: getRedoRecord - Record: {futures[f][PAYLOAD_RECORD]}')
logger.error(f'Exception: {ex} - Operation: processRedoRecord - Record: {futures[f][PAYLOAD_RECORD]}')
redo_error_recs += 1
except (G2Exception, G2UnrecoverableException) as ex:
logger.critical(f'Exception: {ex} - Operation: getRedoRecord - Record: {futures[f][PAYLOAD_RECORD]}')
logger.critical(f'Exception: {ex} - Operation: processRedoRecord - Record: {futures[f][PAYLOAD_RECORD]}')
do_shutdown = True
else:
if redo_record and add_future and not do_shutdown:
redo_records += 1
futures[redoer.submit(process_redo_record, engine, redo_record, with_info)] = (redo_record, time.time())
if add_future and not do_shutdown:
redo_record = get_redo_records(engine, 1)
if redo_record:
redo_records += 1
futures[redoer.submit(process_redo_record, engine, redo_record, with_info)] = (redo_record, time.time())
else:
no_redo_record = True

if result:
out_file.write(result + '\n')
Expand All @@ -350,17 +381,37 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit
finally:
futures.pop(f)

if call_governor:
if do_shutdown and len(futures) == 0 or \
no_redo_record and len(futures) == 0:
break

# Only used for Postgres
if call_governor and not do_shutdown:
gov_pause_secs = gov.govern()
# -1 returned, halt all processing due to transaction ID age (XID) high watermark
# Postgres vacuum required
if gov_pause_secs < 0:
time.sleep(1)
add_future = False
continue
add_future = True

# Slow down processing
if gov_pause_secs > 0:
time.sleep(gov_pause_secs)

# If processing was halted futures would be drained, once processing can continue create
# new futures
if add_future and not no_redo_record:
while len(futures) < max_redo_workers:
redo_record = get_redo_records(engine, 1)
if redo_record:
redo_records += 1
futures[redoer.submit(process_redo_record, engine, redo_record, with_info)] = (redo_record, time.time())
else:
no_redo_record = True
break

time_now = time.time()
if time_now > work_stats_time + WORK_STATS_INTERVAL:
work_stats_time = time_now
Expand All @@ -386,6 +437,7 @@ def load_and_redo(engine, file_input, file_output, file_errors, num_workers, wit

signal.signal(signal.SIGINT, signal_int)
signal.signal(signal.SIGTERM, signal_int)
signal.signal(signal.SIGQUIT, signal_int)

LONG_RECORD = 300
PAYLOAD_RECORD = 0
Expand Down

0 comments on commit afd5817

Please sign in to comment.