From 4d997568b9b4b207c11eb7c25f485b7d46fac542 Mon Sep 17 00:00:00 2001 From: Ahmed Sheta Date: Sat, 3 Feb 2024 02:19:09 +0100 Subject: [PATCH] inital commit to fix the file names bug Signed-off-by: Ahmed Sheta --- src/database/leads/local_repository.py | 4 +- src/database/leads/s3_repository.py | 4 +- src/demo/demos.py | 72 ++++++++++++++++++++++++-- src/preprocessing/preprocessing.py | 58 +++++++++++---------- 4 files changed, 107 insertions(+), 31 deletions(-) diff --git a/src/database/leads/local_repository.py b/src/database/leads/local_repository.py index ebeb90b..c5e53e4 100644 --- a/src/database/leads/local_repository.py +++ b/src/database/leads/local_repository.py @@ -249,7 +249,9 @@ def save_classification_report(self, report, model_name: str): except Exception as e: log.error(f"Could not save report at {report_file_path}! Error: {str(e)}") - def load_preprocessed_data(self, file_name: str = "preprocessed_data.csv"): + def load_preprocessed_data( + self, file_name: str = "historical_preprocessed_data.csv" + ): try: return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name)) except FileNotFoundError: diff --git a/src/database/leads/s3_repository.py b/src/database/leads/s3_repository.py index 4264ef4..2e11ed5 100644 --- a/src/database/leads/s3_repository.py +++ b/src/database/leads/s3_repository.py @@ -374,7 +374,9 @@ def save_classification_report(self, report, model_name: str): except Exception as e: log.error(f"Could not save report for '{model_name}' to S3: {str(e)}") - def load_preprocessed_data(self, file_name: str = "preprocessed_data.csv"): + def load_preprocessed_data( + self, file_name: str = "historical_preprocessed_data.csv" + ): file_path = self.DF_PREPROCESSED_INPUT + file_name if not file_path.startswith("s3://"): log.error( diff --git a/src/demo/demos.py b/src/demo/demos.py index c4de78d..41d3054 100644 --- a/src/demo/demos.py +++ b/src/demo/demos.py @@ -10,6 +10,7 @@ import re import subprocess +import pandas as pd import xgboost as xgb from sklearn.metrics import classification_report @@ -40,6 +41,7 @@ INPUT_FILE_BDC = "../data/sumup_leads_email.csv" OUTPUT_FILE_BDC = "../data/collected_data.json" + # evp demo def evp_demo(): data = get_database().load_preprocessed_data() @@ -212,19 +214,71 @@ def pipeline_demo(): def preprocessing_demo(): - if get_yes_no_input("Filter out the API-irrelevant data? (y/n)"): + if get_yes_no_input("Filter out the API-irrelevant data? (y/n)\n"): filter_bool = True else: filter_bool = False if get_yes_no_input( - "Run on historical data ? (y/n)\nNote: DATABASE_TYPE should be S3!" + "Run on historical data ? (y/n)\n'n' means it will run on lead data!\n" ): historical_bool = True else: historical_bool = False + if get_yes_no_input("Run on S3? (y/n)\n'n' means it will run locally!\n"): + S3_bool = True + else: + S3_bool = False + preprocessor = Preprocessing( filter_null_data=filter_bool, historical_data=historical_bool ) + if historical_bool and S3_bool: + preprocessor.data_path = ( + "s3://amos--data--events/historical_data/100k_historic_enriched.csv" + ) + preprocessor.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/historical_preprocessed_data.csv" + elif historical_bool and not S3_bool: + # input path + input_path_components = preprocessor.data_path.split( + "\\" if "\\" in preprocessor.data_path else "/" + ) + input_path_components.pop() + input_path_components.append("100k_historic_enriched.csv") + input_path = "/".join(input_path_components) + preprocessor.data_path = input_path + + # output path + path_components = preprocessor.data_path.split( + "\\" if "\\" in preprocessor.data_path else "/" + ) + path_components.pop() + path_components.append( + "preprocessed_data_files/historical_preprocessed_data.csv" + ) + preprocessor.prerocessed_data_output_path = "/".join(path_components) + elif not historical_bool and S3_bool: + preprocessor.data_path = "s3://amos--data--events/leads/enriched.csv" + preprocessor.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/leads_preprocessed_data.csv" + elif not historical_bool and not S3_bool: + # input path + input_path_components = preprocessor.data_path.split( + "\\" if "\\" in preprocessor.data_path else "/" + ) + input_path_components.pop() + input_path_components.append("leads_enriched.csv") + input_path = "/".join(input_path_components) + preprocessor.data_path = input_path + + # output path + path_components = preprocessor.data_path.split( + "\\" if "\\" in preprocessor.data_path else "/" + ) + path_components.pop() + path_components.append("preprocessed_data_files/leads_preprocessed_data.csv") + preprocessor.prerocessed_data_output_path = "/".join(path_components) + + preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path) + df = preprocessor.implement_preprocessing_pipeline() preprocessor.save_preprocessed_data() @@ -250,7 +304,18 @@ def predict_MerchantSize_on_lead_data_demo(): from preprocessing import Preprocessing preprocessor = Preprocessing(filter_null_data=False, historical_data=False) - leads_enriched_path = "s3://amos--data--events/leads/enriched.csv" + + leads_enriched_path = "s3://amos--data--events/leads/enriched.csv" # S3 path + + # # input path + # input_path_components = preprocessor.data_path.split( + # "\\" if "\\" in preprocessor.data_path else "/" + # ) + # input_path_components.pop() + # input_path_components.append("leads_enriched.csv") + # input_path = "/".join(input_path_components) # local path + # preprocessor.data_path = input_path + if not leads_enriched_path: log.error( "No such file exists in the directory s3://amos--data--events/leads/enriched.csv" @@ -259,6 +324,7 @@ def predict_MerchantSize_on_lead_data_demo(): preprocessor.prerocessed_data_output_path = ( "s3://amos--data--events/leads/preprocessed_leads_data.csv" ) + preprocessor.preprocessed_df = pd.read_csv(leads_enriched_path) df = preprocessor.implement_preprocessing_pipeline() preprocessor.save_preprocessed_data() diff --git a/src/preprocessing/preprocessing.py b/src/preprocessing/preprocessing.py index 78f7c06..f47510b 100644 --- a/src/preprocessing/preprocessing.py +++ b/src/preprocessing/preprocessing.py @@ -32,31 +32,34 @@ class Preprocessing: def __init__(self, filter_null_data=True, historical_data=False): data_repo = get_database() self.data_path = data_repo.get_output_path() - if historical_data: - input_path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - input_path_components.pop() - input_path_components.pop() - input_path_components.append("historical_data/100k_historic_enriched.csv") - input_path = "/".join(input_path_components) - data = pd.read_csv(input_path) - log.debug(f"Data path = {input_path}") - else: - log.debug(f"Data path = {self.data_path}") - data = pd.read_csv(self.data_path) - self.preprocessed_df = data.copy() - - if historical_data: - self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv" - else: - # created the new output path based on which repo used - path_components = self.data_path.split( - "\\" if "\\" in self.data_path else "/" - ) - path_components.pop() - path_components.append("preprocessed_data.csv") - self.prerocessed_data_output_path = "/".join(path_components) + self.preprocessed_df = None + self.prerocessed_data_output_path = None + # if historical_data: + # input_path_components = self.data_path.split( + # "\\" if "\\" in self.data_path else "/" + # ) + # input_path_components.pop() + # input_path_components.pop() + # input_path_components.append("historical_data/100k_historic_enriched.csv") + # input_path = "/".join(input_path_components) + # data = pd.read_csv(input_path) + # log.debug(f"Data path = {input_path}") + # self.preprocessed_df = data.copy() + # else: + # log.debug(f"Data path = {self.data_path}") + # data = pd.read_csv(self.data_path) + # self.preprocessed_df = data.copy() + + # if historical_data: + # self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv" + # else: + # # created the new output path based on which repo used + # path_components = self.data_path.split( + # "\\" if "\\" in self.data_path else "/" + # ) + # path_components.pop() + # path_components.append("preprocessed_data_files/preprocessed_data.csv") + # self.prerocessed_data_output_path = "/".join(path_components) self.filter_bool = filter_null_data # columns that would be added later after one-hot encoding each class @@ -114,7 +117,10 @@ def filter_out_null_data(self): ] def fill_missing_values(self, column, strategy="constant"): - if column in self.preprocessed_df.columns: + if ( + column in self.preprocessed_df.columns + and not self.preprocessed_df[column].empty + ): imputer = SimpleImputer(strategy=strategy) self.preprocessed_df[column] = imputer.fit_transform( self.preprocessed_df[[column]]