From 581cc5d0823dc814c6da292c396c20ff688c894a Mon Sep 17 00:00:00 2001 From: Key Date: Mon, 14 Oct 2024 16:38:03 -0500 Subject: [PATCH 1/2] upload files in chunks --- target_sftp/__init__.py | 100 ++++++++++++++++++++++++++++++++++------ 1 file changed, 85 insertions(+), 15 deletions(-) diff --git a/target_sftp/__init__.py b/target_sftp/__init__.py index 8bdd5f7..2aace2f 100644 --- a/target_sftp/__init__.py +++ b/target_sftp/__init__.py @@ -3,8 +3,13 @@ import json import argparse import logging - +import threading +import paramiko +from time import sleep +from concurrent.futures import ThreadPoolExecutor +import threading from target_sftp import client +import backoff logger = logging.getLogger("target-sftp") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -37,15 +42,74 @@ def parse_args(): return args - - -def upload(args): +created = False +lock = threading.Lock() + +def upload_part(sftp, num, offset, part_size, local_path, remote_path): + global created + logger.info(f"Running thread {num}") + try: + with open(local_path, "rb") as fl: + fl.seek(offset) + + with lock: + m = "r+" if created else "w" + created = True + try: + fr = sftp.open(remote_path, m) + with fr: + fr.seek(offset) + fr.set_pipelined(True) + size = 0 + while size < part_size: + s = 32768 + if size + s > part_size: + s = part_size - size + data = fl.read(s) + if len(data) == 0 or not data: + break + fr.write(data) + size += len(data) + except (paramiko.sftp.SFTPError, OSError) as e: + logger.warning(f"Thread {num}: Error opening remote file, retrying. Error: {e}") + sleep(5) + _,_, sftp = start_sftp() + fr = sftp.open(remote_path, m) + except (paramiko.ssh_exception.SSHException) as x: + logger.info(f"Thread {num} failed: {x}") + logger.info(f"Thread {num} done") + + +def upload_file_in_chunks(sftp_client, local_path, remote_path, chunk_size=1048576): + logger.info("Starting to upload file...") + offset = 0 + threads_count = 5 + size = os.path.getsize(local_path) + part_size = int(size / threads_count) + + logger.info("Starting uploading file in chunks...") + with ThreadPoolExecutor(max_workers=threads_count) as executor: + futures = [] + for num in range(threads_count): + part_size_adjusted = part_size if num < threads_count - 1 else size - offset + args = (sftp_client, num, offset, part_size_adjusted, local_path, remote_path) + logger.info(f"Starting thread {num} offset {offset} size {part_size_adjusted}") + logger.debug(f"1. Active threads: {threading.enumerate()}") + futures.append(executor.submit(upload_part, *args)) + offset += part_size + + for future in futures: + logger.debug(f"2. Active threads: {threading.enumerate()}") + future.result() + logger.debug(f"3. Active threads: {threading.enumerate()}") + + logger.info("Upload file in chunks, all threads done") + + +def upload(): + logger.info(f"Initializing sftp server...") + config, sftp_conection, sftp_client = start_sftp() logger.info(f"Exporting data...") - config = args.config - # Upload all data in input_path to sftp - ## I don't think preserving directory structure matters, a nice to have, but error-prone - sftp_conection = client.connection(config) - sftp_client = sftp_conection.sftp output_path = config["path_prefix"] export_path = output_path.lstrip("/").rstrip("/") if not export_path: @@ -105,8 +169,7 @@ def upload(args): sftp_client.remove(file) logger.info(f"Removed existing file: {file}") - confirm = config.get("confirm", True) - sftp_client.put(file_path, file, confirm=confirm) + upload_file_in_chunks(sftp_client, file_path, file) if prev_cwd is not None: sftp_client.chdir(prev_cwd) @@ -114,13 +177,20 @@ def upload(args): logger.info(f"Closing SFTP connection...") sftp_conection.close() - -def main(): - # Parse command line arguments +@backoff.on_exception(backoff.expo, paramiko.ssh_exception.SSHException, max_tries=5) +def start_sftp(): args = parse_args() + config = args.config + # Upload all data in input_path to sftp + ## I don't think preserving directory structure matters, a nice to have, but error-prone + sftp_conection = client.connection(config) + sftp_client = sftp_conection.sftp + return config,sftp_conection,sftp_client + +def main(): # Upload the data - upload(args) + upload() if __name__ == "__main__": From ad67b1ea5ed919b8adddd902aad7921bcbf9ce45 Mon Sep 17 00:00:00 2001 From: Key Date: Mon, 14 Oct 2024 16:39:13 -0500 Subject: [PATCH 2/2] remove chunk_size as parameter --- target_sftp/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_sftp/__init__.py b/target_sftp/__init__.py index 2aace2f..133d3f1 100644 --- a/target_sftp/__init__.py +++ b/target_sftp/__init__.py @@ -80,7 +80,7 @@ def upload_part(sftp, num, offset, part_size, local_path, remote_path): logger.info(f"Thread {num} done") -def upload_file_in_chunks(sftp_client, local_path, remote_path, chunk_size=1048576): +def upload_file_in_chunks(sftp_client, local_path, remote_path): logger.info("Starting to upload file...") offset = 0 threads_count = 5