diff --git a/scripts/data/client.py b/scripts/data/client.py index ff04a349..ed66766f 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -1,21 +1,22 @@ #!/usr/bin/env python3 -from dataclasses import dataclass +import argparse import json -import re +import logging import os -import threading import queue -import argparse +import random +import re +import signal import subprocess -import logging +import threading from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from pathlib import Path -import random +from logging.handlers import TimedRotatingFileHandler + from generate_data import generate_data from format_args import format_args -import logging -from logging.handlers import TimedRotatingFileHandler logger = logging.getLogger(__name__) @@ -23,25 +24,35 @@ MAX_WEIGHT_LIMIT = 8000 # Total weight limit for all jobs THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue - -BASE_DIR = Path(".client_cache") +BASE_DIR = Path(".client_cache") # Shared state variables # Shared state variables current_weight = 0 weight_lock = threading.Condition() job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) +# Create an event for graceful shutdown +shutdown_event = threading.Event() + + +def handle_sigterm(signum, frame): + logger.info("Received SIGTERM signal. Shutting down gracefully...") + shutdown_event.set() # Set the event to signal threads to exit + + +# Register the SIGTERM handler +signal.signal(signal.SIGTERM, handle_sigterm) + # Function to calculate weight of a block def calculate_batch_weight(block_data, mode): if mode == "light": return len(block_data["blocks"]) - else: - return sum( - len(tx["inputs"]) + len(tx["outputs"]) - for block in block_data["blocks"] - for tx in block["data"]["transactions"] - ) + return sum( + len(tx["inputs"]) + len(tx["outputs"]) + for block in block_data["blocks"] + for tx in block["data"]["transactions"] + ) @dataclass @@ -70,15 +81,13 @@ def job_generator(start, blocks, step, mode, strategy): for height in height_range: try: batch_file = BASE_DIR / f"{mode}_{height}_{step}.json" - batch_data = generate_data( mode=mode, initial_height=height, num_blocks=step, fast=True ) - Path(batch_file).write_text(json.dumps(batch_data, indent=2)) - batch_weight = calculate_batch_weight(batch_data, mode) yield Job(height, step, mode, batch_weight, batch_file), batch_weight + except Exception as e: logger.error(f"Error while generating data for: {height}:\n{e}") @@ -113,6 +122,7 @@ def process_batch(job): or "panicked" in result.stdout ): error = result.stdout or result.stderr + if result.returncode == -9: match = re.search(r"gas_spent=(\d+)", result.stdout) gas_info = ( @@ -122,16 +132,19 @@ def process_batch(job): ) error = f"Return code -9, killed by OOM?{gas_info}" message = error + else: error_match = re.search(r"error='([^']*)'", error) message = error_match.group(1) if error_match else "" logger.error(f"{job} error: {message}") logger.debug(f"Full error while processing: {job}:\n{error}") + else: match = re.search(r"gas_spent=(\d+)", result.stdout) gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found" logger.info(f"{job} done, {gas_info}") + if not match: logger.warning(f"{job}: not gas info found") @@ -142,76 +155,80 @@ def job_producer(job_gen): try: for job, weight in job_gen: - # Wait until there is enough weight capacity to add the new block + # Check for shutdown + if shutdown_event.is_set(): + logger.info("Shutdown event detected in producer. Exiting...") + break + with weight_lock: logger.debug( f"Adding job: {job}, current total weight: {current_weight}..." ) + while ( (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight != 0 or job_queue.full() ): + if shutdown_event.is_set(): + logger.info("Shutdown event detected while waiting. Exiting...") + return + logger.debug("Producer is waiting for weight to be released.") - weight_lock.wait() # Wait for the condition to be met + weight_lock.wait() if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0: logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}") - # Add the job to the queue and update the weight job_queue.put((job, weight)) current_weight += weight - logger.debug( - f"Produced job: {job}, current total weight: {current_weight}" - ) - # Notify consumers that a new job is available weight_lock.notify_all() + finally: logger.debug("Producer is exiting...") + for _ in range(THREAD_POOL_SIZE): job_queue.put(None) with weight_lock: weight_lock.notify_all() - logger.debug("Consumers notified") - # Consumer function: Processes blocks from the queue def job_consumer(process_job): global current_weight while True: + if shutdown_event.is_set(): + logger.info("Shutdown event detected in consumer. Exiting...") + break + try: logger.debug( f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}" ) - # Get a job from the queue + work_to_do = job_queue.get(block=True) if work_to_do is None: - logger.debug("No more work to do, consumer is exiting.") job_queue.task_done() break (job, weight) = work_to_do - # Process the block try: logger.debug(f"Executing job: {job}...") process_job(job) + except Exception as e: logger.error(f"Error while processing job: {job}:\n{e}") - with weight_lock: - current_weight -= weight - logger.debug( - f"Finished processing job, current total weight: {current_weight}" - ) - weight_lock.notify_all() # Notify producer to add more jobs + with weight_lock: + current_weight -= weight + + weight_lock.notify_all() - # Mark job as done job_queue.task_done() except Exception as e: @@ -220,7 +237,6 @@ def job_consumer(process_job): def main(start, blocks, step, mode, strategy): - logger.info( "Starting client, initial height: %d, blocks: %d, step: %d, mode: %s, strategy: %s", start, @@ -229,6 +245,7 @@ def main(start, blocks, step, mode, strategy): mode, strategy, ) + logger.info( "Max weight limit: %d, Thread pool size: %d, Queue max size: %d", MAX_WEIGHT_LIMIT, @@ -260,14 +277,11 @@ def main(start, blocks, step, mode, strategy): if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Run client script") + parser.add_argument("--start", type=int, required=True, help="Start block height") parser.add_argument( - "--blocks", - type=int, - default=1, - help="Number of blocks to process", + "--blocks", type=int, default=1, help="Number of blocks to process" ) parser.add_argument( "--step", type=int, default=1, help="Step size for block processing" @@ -292,7 +306,6 @@ def main(start, blocks, step, mode, strategy): MAX_WEIGHT_LIMIT = args.maxweight - # file_handler = logging.FileHandler("client.log") file_handler = TimedRotatingFileHandler( filename="client.log", when="midnight", @@ -300,26 +313,35 @@ def main(start, blocks, step, mode, strategy): backupCount=14, encoding="utf8", ) + file_handler.setLevel(logging.INFO) + file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s") ) console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + console_handler.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ) + root_logger = logging.getLogger() + root_logger.addHandler(console_handler) + root_logger.addHandler(file_handler) if args.verbose: root_logger.setLevel(logging.DEBUG) + else: root_logger.setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("generate_data").setLevel(logging.WARNING) main(args.start, args.blocks, args.step, args.mode, args.strategy)