From 3fb3cfbda4a0fa4ed16f9bbf0709bd141c34c8ca Mon Sep 17 00:00:00 2001 From: Darshan Chauhan <46072647+GradleD@users.noreply.github.com> Date: Tue, 5 Nov 2024 18:56:17 +0530 Subject: [PATCH 1/4] Update client.py SIGTERM handled --- scripts/data/client.py | 56 ++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/scripts/data/client.py b/scripts/data/client.py index ff04a349..a4cd1855 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -16,6 +16,7 @@ from format_args import format_args import logging from logging.handlers import TimedRotatingFileHandler +import signal logger = logging.getLogger(__name__) @@ -31,6 +32,15 @@ weight_lock = threading.Condition() job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) +# Create an event for graceful shutdown +shutdown_event = threading.Event() + +def handle_sigterm(signum, frame): + logger.info("Received SIGTERM signal. Shutting down gracefully...") + shutdown_event.set() # Set the event to signal threads to exit + +# Register the SIGTERM handler +signal.signal(signal.SIGTERM, handle_sigterm) # Function to calculate weight of a block def calculate_batch_weight(block_data, mode): @@ -137,12 +147,17 @@ def process_batch(job): # Producer function: Generates data and adds jobs to the queue +# Update job_producer function def job_producer(job_gen): global current_weight try: for job, weight in job_gen: - # Wait until there is enough weight capacity to add the new block + # Check for shutdown + if shutdown_event.is_set(): + logger.info("Shutdown event detected in producer. Exiting...") + break + with weight_lock: logger.debug( f"Adding job: {job}, current total weight: {current_weight}..." @@ -152,21 +167,20 @@ def job_producer(job_gen): and current_weight != 0 or job_queue.full() ): + if shutdown_event.is_set(): + logger.info("Shutdown event detected while waiting. Exiting...") + return + logger.debug("Producer is waiting for weight to be released.") - weight_lock.wait() # Wait for the condition to be met - + weight_lock.wait() + if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0: logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}") - - # Add the job to the queue and update the weight + job_queue.put((job, weight)) current_weight += weight - logger.debug( - f"Produced job: {job}, current total weight: {current_weight}" - ) - - # Notify consumers that a new job is available weight_lock.notify_all() + finally: logger.debug("Producer is exiting...") for _ in range(THREAD_POOL_SIZE): @@ -175,29 +189,28 @@ def job_producer(job_gen): with weight_lock: weight_lock.notify_all() - logger.debug("Consumers notified") - - # Consumer function: Processes blocks from the queue +# Update job_consumer function def job_consumer(process_job): global current_weight while True: + if shutdown_event.is_set(): + logger.info("Shutdown event detected in consumer. Exiting...") + break + try: logger.debug( f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}" ) - # Get a job from the queue work_to_do = job_queue.get(block=True) - + if work_to_do is None: - logger.debug("No more work to do, consumer is exiting.") job_queue.task_done() break (job, weight) = work_to_do - # Process the block try: logger.debug(f"Executing job: {job}...") process_job(job) @@ -206,19 +219,14 @@ def job_consumer(process_job): with weight_lock: current_weight -= weight - logger.debug( - f"Finished processing job, current total weight: {current_weight}" - ) - weight_lock.notify_all() # Notify producer to add more jobs + weight_lock.notify_all() - # Mark job as done job_queue.task_done() - + except Exception as e: logger.error("Error in the consumer: %s", e) break - def main(start, blocks, step, mode, strategy): logger.info( From fe2dd0fb1a2ad2719deff95fe42ed61c52f05a5a Mon Sep 17 00:00:00 2001 From: Darshan Chauhan <46072647+GradleD@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:02:13 +0000 Subject: [PATCH 2/4] Formatting Lint issue resolved --- scripts/data/client.py | 216 +++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 128 deletions(-) diff --git a/scripts/data/client.py b/scripts/data/client.py index a4cd1855..1ddde272 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -1,22 +1,22 @@ #!/usr/bin/env python3 -from dataclasses import dataclass +import argparse import json -import re +import logging import os -import threading import queue -import argparse +import random +import re +import signal import subprocess -import logging +import threading from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from pathlib import Path -import random +from logging.handlers import TimedRotatingFileHandler + from generate_data import generate_data from format_args import format_args -import logging -from logging.handlers import TimedRotatingFileHandler -import signal logger = logging.getLogger(__name__) @@ -24,8 +24,7 @@ MAX_WEIGHT_LIMIT = 8000 # Total weight limit for all jobs THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue - -BASE_DIR = Path(".client_cache") +BASE_DIR = Path(".client_cache") # Shared state variables # Shared state variables current_weight = 0 @@ -53,7 +52,6 @@ def calculate_batch_weight(block_data, mode): for tx in block["data"]["transactions"] ) - @dataclass class Job: height: int @@ -65,162 +63,140 @@ class Job: def __str__(self): return f"Job(height='{self.height}', step={self.step}, weight='{self.weight}')" - # Generator function to create jobs def job_generator(start, blocks, step, mode, strategy): BASE_DIR.mkdir(exist_ok=True) end = start + blocks - + height_range, step = ( ([random.randint(start, end) for _ in range(start, end)], 1) - if strategy == "random" - else (range(start, end, step), step) + if strategy == "random" else (range(start, end, step), step) ) - + for height in height_range: try: batch_file = BASE_DIR / f"{mode}_{height}_{step}.json" - - batch_data = generate_data( - mode=mode, initial_height=height, num_blocks=step, fast=True - ) - + batch_data = generate_data(mode=mode, initial_height=height, num_blocks=step, fast=True) Path(batch_file).write_text(json.dumps(batch_data, indent=2)) - batch_weight = calculate_batch_weight(batch_data, mode) yield Job(height, step, mode, batch_weight, batch_file), batch_weight + except Exception as e: logger.error(f"Error while generating data for: {height}:\n{e}") - # Function to process a batch def process_batch(job): arguments_file = job.batch_file.as_posix().replace(".json", "-arguments.json") - + with open(arguments_file, "w") as af: af.write(str(format_args(job.batch_file, False, False))) - + result = subprocess.run( [ - "scarb", - "cairo-run", - "--no-build", - "--package", - "client", - "--function", - "main", - "--arguments-file", - str(arguments_file), + "scarb", "cairo-run", "--no-build", "--package", "client", + "--function", "main", "--arguments-file", str(arguments_file), ], capture_output=True, text=True, ) - - if ( - result.returncode != 0 - or "FAIL" in result.stdout - or "error" in result.stdout - or "panicked" in result.stdout - ): + + if (result.returncode != 0 or "FAIL" in result.stdout or + "error" in result.stdout or "panicked" in result.stdout): + error = result.stdout or result.stderr + if result.returncode == -9: match = re.search(r"gas_spent=(\d+)", result.stdout) - gas_info = ( - f", gas spent: {int(match.group(1))}" - if match - else ", no gas info found" - ) + gas_info = (f", gas spent: {int(match.group(1))}" if match else ", no gas info found") error = f"Return code -9, killed by OOM?{gas_info}" message = error + else: error_match = re.search(r"error='([^']*)'", error) message = error_match.group(1) if error_match else "" - + logger.error(f"{job} error: {message}") logger.debug(f"Full error while processing: {job}:\n{error}") + else: match = re.search(r"gas_spent=(\d+)", result.stdout) gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found" logger.info(f"{job} done, {gas_info}") + if not match: logger.warning(f"{job}: not gas info found") - -# Producer function: Generates data and adds jobs to the queue -# Update job_producer function +# Producer function: Generates data and adds jobs to the queue def job_producer(job_gen): global current_weight - + try: for job, weight in job_gen: - # Check for shutdown + # Check for shutdown if shutdown_event.is_set(): logger.info("Shutdown event detected in producer. Exiting...") break with weight_lock: - logger.debug( - f"Adding job: {job}, current total weight: {current_weight}..." - ) - while ( - (current_weight + weight > MAX_WEIGHT_LIMIT) - and current_weight != 0 - or job_queue.full() - ): + logger.debug(f"Adding job: {job}, current total weight: {current_weight}...") + + while ((current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight != 0 or job_queue.full()): if shutdown_event.is_set(): logger.info("Shutdown event detected while waiting. Exiting...") return logger.debug("Producer is waiting for weight to be released.") weight_lock.wait() - + if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0: logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}") - + job_queue.put((job, weight)) current_weight += weight + weight_lock.notify_all() finally: logger.debug("Producer is exiting...") + for _ in range(THREAD_POOL_SIZE): job_queue.put(None) - + with weight_lock: weight_lock.notify_all() -# Consumer function: Processes blocks from the queue -# Update job_consumer function +# Consumer function: Processes blocks from the queue def job_consumer(process_job): global current_weight - + while True: if shutdown_event.is_set(): logger.info("Shutdown event detected in consumer. Exiting...") break try: - logger.debug( - f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}" - ) + logger.debug(f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}") + work_to_do = job_queue.get(block=True) if work_to_do is None: job_queue.task_done() break - + (job, weight) = work_to_do - + try: logger.debug(f"Executing job: {job}...") process_job(job) + except Exception as e: logger.error(f"Error while processing job: {job}:\n{e}") - - with weight_lock: - current_weight -= weight + + with weight_lock: + current_weight -= weight + weight_lock.notify_all() - + job_queue.task_done() except Exception as e: @@ -228,7 +204,6 @@ def job_consumer(process_job): break def main(start, blocks, step, mode, strategy): - logger.info( "Starting client, initial height: %d, blocks: %d, step: %d, mode: %s, strategy: %s", start, @@ -237,70 +212,50 @@ def main(start, blocks, step, mode, strategy): mode, strategy, ) + logger.info( "Max weight limit: %d, Thread pool size: %d, Queue max size: %d", MAX_WEIGHT_LIMIT, THREAD_POOL_SIZE, QUEUE_MAX_SIZE, ) - - # Create the job generator + + # Create the job generator job_gen = job_generator(start, blocks, step, mode, strategy) - - # Start the job producer thread + + # Start the job producer thread producer_thread = threading.Thread(target=job_producer, args=(job_gen,)) producer_thread.start() - - # Start the consumer threads using ThreadPoolExecutor + + # Start the consumer threads using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE) as executor: - futures = [ - executor.submit(job_consumer, process_batch) - for _ in range(THREAD_POOL_SIZE) - ] - - # Wait for producer to finish + futures = [executor.submit(job_consumer, process_batch) for _ in range(THREAD_POOL_SIZE)] + + # Wait for producer to finish producer_thread.join() - - # Wait for all items in the queue to be processed + + # Wait for all items in the queue to be processed job_queue.join() - + logger.info("All jobs have been processed.") - if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Run client script") + parser.add_argument("--start", type=int, required=True, help="Start block height") - parser.add_argument( - "--blocks", - type=int, - default=1, - help="Number of blocks to process", - ) - parser.add_argument( - "--step", type=int, default=1, help="Step size for block processing" - ) - parser.add_argument( - "--mode", default="light", choices=["light", "full"], help="Client mode" - ) - parser.add_argument( - "--strategy", - default="sequential", - choices=["sequential", "random"], - help="Processing strategy", - ) - - parser.add_argument( - "--maxweight", type=int, default=MAX_WEIGHT_LIMIT, help="Max weight limit" - ) - + parser.add_argument("--blocks", type=int, default=1, help="Number of blocks to process") + parser.add_argument("--step", type=int, default=1, help="Step size for block processing") + parser.add_argument("--mode", default="light", choices=["light", "full"], help="Client mode") + parser.add_argument("--strategy", default="sequential", choices=["sequential", "random"], help="Processing strategy") + + parser.add_argument("--maxweight", type=int, default=MAX_WEIGHT_LIMIT, help="Max weight limit") + parser.add_argument("--verbose", action="store_true", help="Verbose") - + args = parser.parse_args() - + MAX_WEIGHT_LIMIT = args.maxweight - # file_handler = logging.FileHandler("client.log") file_handler = TimedRotatingFileHandler( filename="client.log", when="midnight", @@ -308,26 +263,31 @@ def main(start, blocks, step, mode, strategy): backupCount=14, encoding="utf8", ) + file_handler.setLevel(logging.INFO) - file_handler.setFormatter( - logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s") - ) - + + file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s")) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) - console_handler.setFormatter( - logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - ) + + console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) + root_logger = logging.getLogger() + root_logger.addHandler(console_handler) + root_logger.addHandler(file_handler) if args.verbose: root_logger.setLevel(logging.DEBUG) + else: root_logger.setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("generate_data").setLevel(logging.WARNING) - main(args.start, args.blocks, args.step, args.mode, args.strategy) + main(args.start, args.blocks, args.step, args.mode, args.strategy) \ No newline at end of file From 9ca2d398fa2a69f1998b244d086a1c6ccc2bc3b1 Mon Sep 17 00:00:00 2001 From: Darshan Chauhan <46072647+GradleD@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:13:06 +0000 Subject: [PATCH 3/4] Re-formatting Lint issue --- scripts/data/client.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/scripts/data/client.py b/scripts/data/client.py index 1ddde272..3ea853f2 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -45,12 +45,11 @@ def handle_sigterm(signum, frame): def calculate_batch_weight(block_data, mode): if mode == "light": return len(block_data["blocks"]) - else: - return sum( - len(tx["inputs"]) + len(tx["outputs"]) - for block in block_data["blocks"] - for tx in block["data"]["transactions"] - ) + return sum( + len(tx["inputs"]) + len(tx["outputs"]) + for block in block_data["blocks"] + for tx in block["data"]["transactions"] + ) @dataclass class Job: @@ -100,9 +99,12 @@ def process_batch(job): text=True, ) - if (result.returncode != 0 or "FAIL" in result.stdout or - "error" in result.stdout or "panicked" in result.stdout): - + if ( + result.returncode != 0 or + "FAIL" in result.stdout or + "error" in result.stdout or + "panicked" in result.stdout + ): error = result.stdout or result.stderr if result.returncode == -9: @@ -140,7 +142,11 @@ def job_producer(job_gen): with weight_lock: logger.debug(f"Adding job: {job}, current total weight: {current_weight}...") - while ((current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight != 0 or job_queue.full()): + while ( + (current_weight + weight > MAX_WEIGHT_LIMIT) and + current_weight != 0 or + job_queue.full() + ): if shutdown_event.is_set(): logger.info("Shutdown event detected while waiting. Exiting...") return From 57f9c980978c42d30fa79bcde5dfff44fba09c39 Mon Sep 17 00:00:00 2001 From: Darshan Chauhan <46072647+GradleD@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:31:23 +0000 Subject: [PATCH 4/4] Reformatted with black --- scripts/data/client.py | 218 +++++++++++++++++++++++++---------------- 1 file changed, 133 insertions(+), 85 deletions(-) diff --git a/scripts/data/client.py b/scripts/data/client.py index 3ea853f2..ed66766f 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -34,13 +34,16 @@ # Create an event for graceful shutdown shutdown_event = threading.Event() + def handle_sigterm(signum, frame): logger.info("Received SIGTERM signal. Shutting down gracefully...") shutdown_event.set() # Set the event to signal threads to exit + # Register the SIGTERM handler signal.signal(signal.SIGTERM, handle_sigterm) + # Function to calculate weight of a block def calculate_batch_weight(block_data, mode): if mode == "light": @@ -51,6 +54,7 @@ def calculate_batch_weight(block_data, mode): for tx in block["data"]["transactions"] ) + @dataclass class Job: height: int @@ -62,153 +66,176 @@ class Job: def __str__(self): return f"Job(height='{self.height}', step={self.step}, weight='{self.weight}')" + # Generator function to create jobs def job_generator(start, blocks, step, mode, strategy): BASE_DIR.mkdir(exist_ok=True) end = start + blocks - + height_range, step = ( ([random.randint(start, end) for _ in range(start, end)], 1) - if strategy == "random" else (range(start, end, step), step) + if strategy == "random" + else (range(start, end, step), step) ) - + for height in height_range: try: batch_file = BASE_DIR / f"{mode}_{height}_{step}.json" - batch_data = generate_data(mode=mode, initial_height=height, num_blocks=step, fast=True) + batch_data = generate_data( + mode=mode, initial_height=height, num_blocks=step, fast=True + ) Path(batch_file).write_text(json.dumps(batch_data, indent=2)) batch_weight = calculate_batch_weight(batch_data, mode) yield Job(height, step, mode, batch_weight, batch_file), batch_weight - + except Exception as e: logger.error(f"Error while generating data for: {height}:\n{e}") + # Function to process a batch def process_batch(job): arguments_file = job.batch_file.as_posix().replace(".json", "-arguments.json") - + with open(arguments_file, "w") as af: af.write(str(format_args(job.batch_file, False, False))) - + result = subprocess.run( [ - "scarb", "cairo-run", "--no-build", "--package", "client", - "--function", "main", "--arguments-file", str(arguments_file), + "scarb", + "cairo-run", + "--no-build", + "--package", + "client", + "--function", + "main", + "--arguments-file", + str(arguments_file), ], capture_output=True, text=True, ) - + if ( - result.returncode != 0 or - "FAIL" in result.stdout or - "error" in result.stdout or - "panicked" in result.stdout + result.returncode != 0 + or "FAIL" in result.stdout + or "error" in result.stdout + or "panicked" in result.stdout ): error = result.stdout or result.stderr - + if result.returncode == -9: match = re.search(r"gas_spent=(\d+)", result.stdout) - gas_info = (f", gas spent: {int(match.group(1))}" if match else ", no gas info found") + gas_info = ( + f", gas spent: {int(match.group(1))}" + if match + else ", no gas info found" + ) error = f"Return code -9, killed by OOM?{gas_info}" message = error - + else: error_match = re.search(r"error='([^']*)'", error) message = error_match.group(1) if error_match else "" - + logger.error(f"{job} error: {message}") logger.debug(f"Full error while processing: {job}:\n{error}") - + else: match = re.search(r"gas_spent=(\d+)", result.stdout) gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found" logger.info(f"{job} done, {gas_info}") - + if not match: logger.warning(f"{job}: not gas info found") -# Producer function: Generates data and adds jobs to the queue + +# Producer function: Generates data and adds jobs to the queue def job_producer(job_gen): global current_weight - + try: for job, weight in job_gen: - # Check for shutdown + # Check for shutdown if shutdown_event.is_set(): logger.info("Shutdown event detected in producer. Exiting...") break - + with weight_lock: - logger.debug(f"Adding job: {job}, current total weight: {current_weight}...") - + logger.debug( + f"Adding job: {job}, current total weight: {current_weight}..." + ) + while ( - (current_weight + weight > MAX_WEIGHT_LIMIT) and - current_weight != 0 or - job_queue.full() + (current_weight + weight > MAX_WEIGHT_LIMIT) + and current_weight != 0 + or job_queue.full() ): if shutdown_event.is_set(): logger.info("Shutdown event detected while waiting. Exiting...") return - + logger.debug("Producer is waiting for weight to be released.") weight_lock.wait() - + if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0: logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}") - + job_queue.put((job, weight)) current_weight += weight - + weight_lock.notify_all() - + finally: logger.debug("Producer is exiting...") - + for _ in range(THREAD_POOL_SIZE): job_queue.put(None) - + with weight_lock: weight_lock.notify_all() -# Consumer function: Processes blocks from the queue + +# Consumer function: Processes blocks from the queue def job_consumer(process_job): global current_weight - + while True: if shutdown_event.is_set(): logger.info("Shutdown event detected in consumer. Exiting...") break - + try: - logger.debug(f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}") - + logger.debug( + f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}" + ) + work_to_do = job_queue.get(block=True) - + if work_to_do is None: job_queue.task_done() break - + (job, weight) = work_to_do - + try: logger.debug(f"Executing job: {job}...") process_job(job) - + except Exception as e: logger.error(f"Error while processing job: {job}:\n{e}") - + with weight_lock: current_weight -= weight - + weight_lock.notify_all() - + job_queue.task_done() - + except Exception as e: logger.error("Error in the consumer: %s", e) break + def main(start, blocks, step, mode, strategy): logger.info( "Starting client, initial height: %d, blocks: %d, step: %d, mode: %s, strategy: %s", @@ -218,48 +245,65 @@ def main(start, blocks, step, mode, strategy): mode, strategy, ) - + logger.info( "Max weight limit: %d, Thread pool size: %d, Queue max size: %d", MAX_WEIGHT_LIMIT, THREAD_POOL_SIZE, QUEUE_MAX_SIZE, ) - - # Create the job generator + + # Create the job generator job_gen = job_generator(start, blocks, step, mode, strategy) - - # Start the job producer thread + + # Start the job producer thread producer_thread = threading.Thread(target=job_producer, args=(job_gen,)) producer_thread.start() - - # Start the consumer threads using ThreadPoolExecutor + + # Start the consumer threads using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE) as executor: - futures = [executor.submit(job_consumer, process_batch) for _ in range(THREAD_POOL_SIZE)] - - # Wait for producer to finish + futures = [ + executor.submit(job_consumer, process_batch) + for _ in range(THREAD_POOL_SIZE) + ] + + # Wait for producer to finish producer_thread.join() - - # Wait for all items in the queue to be processed + + # Wait for all items in the queue to be processed job_queue.join() - + logger.info("All jobs have been processed.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run client script") - + parser.add_argument("--start", type=int, required=True, help="Start block height") - parser.add_argument("--blocks", type=int, default=1, help="Number of blocks to process") - parser.add_argument("--step", type=int, default=1, help="Step size for block processing") - parser.add_argument("--mode", default="light", choices=["light", "full"], help="Client mode") - parser.add_argument("--strategy", default="sequential", choices=["sequential", "random"], help="Processing strategy") - - parser.add_argument("--maxweight", type=int, default=MAX_WEIGHT_LIMIT, help="Max weight limit") - + parser.add_argument( + "--blocks", type=int, default=1, help="Number of blocks to process" + ) + parser.add_argument( + "--step", type=int, default=1, help="Step size for block processing" + ) + parser.add_argument( + "--mode", default="light", choices=["light", "full"], help="Client mode" + ) + parser.add_argument( + "--strategy", + default="sequential", + choices=["sequential", "random"], + help="Processing strategy", + ) + + parser.add_argument( + "--maxweight", type=int, default=MAX_WEIGHT_LIMIT, help="Max weight limit" + ) + parser.add_argument("--verbose", action="store_true", help="Verbose") - + args = parser.parse_args() - + MAX_WEIGHT_LIMIT = args.maxweight file_handler = TimedRotatingFileHandler( @@ -269,31 +313,35 @@ def main(start, blocks, step, mode, strategy): backupCount=14, encoding="utf8", ) - + file_handler.setLevel(logging.INFO) - - file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s")) - + + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s") + ) + console_handler = logging.StreamHandler() - + console_handler.setLevel(logging.DEBUG) - - console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) - + + console_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) + root_logger = logging.getLogger() - + root_logger.addHandler(console_handler) - + root_logger.addHandler(file_handler) if args.verbose: root_logger.setLevel(logging.DEBUG) - + else: root_logger.setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) - + logging.getLogger("generate_data").setLevel(logging.WARNING) - main(args.start, args.blocks, args.step, args.mode, args.strategy) \ No newline at end of file + main(args.start, args.blocks, args.step, args.mode, args.strategy)