Skip to content

Commit

Permalink
fixed bug and now the pipeline can run locally
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Sheta <ahmed.sheta@fau.de>
  • Loading branch information
ultiwinter7 committed Feb 3, 2024
1 parent 4d99756 commit 93ba51c
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 112 deletions.
151 changes: 66 additions & 85 deletions src/demo/demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,52 +230,8 @@ def preprocessing_demo():
S3_bool = False

preprocessor = Preprocessing(
filter_null_data=filter_bool, historical_data=historical_bool
filter_null_data=filter_bool, historical_bool=historical_bool, S3_bool=S3_bool
)
if historical_bool and S3_bool:
preprocessor.data_path = (
"s3://amos--data--events/historical_data/100k_historic_enriched.csv"
)
preprocessor.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/historical_preprocessed_data.csv"
elif historical_bool and not S3_bool:
# input path
input_path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
input_path_components.pop()
input_path_components.append("100k_historic_enriched.csv")
input_path = "/".join(input_path_components)
preprocessor.data_path = input_path

# output path
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append(
"preprocessed_data_files/historical_preprocessed_data.csv"
)
preprocessor.prerocessed_data_output_path = "/".join(path_components)
elif not historical_bool and S3_bool:
preprocessor.data_path = "s3://amos--data--events/leads/enriched.csv"
preprocessor.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/leads_preprocessed_data.csv"
elif not historical_bool and not S3_bool:
# input path
input_path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
input_path_components.pop()
input_path_components.append("leads_enriched.csv")
input_path = "/".join(input_path_components)
preprocessor.data_path = input_path

# output path
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("preprocessed_data_files/leads_preprocessed_data.csv")
preprocessor.prerocessed_data_output_path = "/".join(path_components)

preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)

Expand All @@ -294,37 +250,23 @@ def predict_MerchantSize_on_lead_data_demo():
import pandas as pd

log.info(
"Note: Enriched data must be located at s3://amos--data--events/leads/enriched.csv"
"Note: In case of running locally, enriched data must be located at src/data/leads_enriched.csv locally\nIn case of running on S3, enriched data must be located at s3://amos--data--events/leads/enriched.csv or"
)

######################### preprocessing the leads ##################################
if get_yes_no_input("Run on S3? (y/n)\n'n' means it will run locally!\n"):
S3_bool = True
else:
S3_bool = False
current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
parent_dir = os.path.join(current_dir, "..")
sys.path.append(parent_dir)
from preprocessing import Preprocessing

preprocessor = Preprocessing(filter_null_data=False, historical_data=False)

leads_enriched_path = "s3://amos--data--events/leads/enriched.csv" # S3 path

# # input path
# input_path_components = preprocessor.data_path.split(
# "\\" if "\\" in preprocessor.data_path else "/"
# )
# input_path_components.pop()
# input_path_components.append("leads_enriched.csv")
# input_path = "/".join(input_path_components) # local path
# preprocessor.data_path = input_path

if not leads_enriched_path:
log.error(
"No such file exists in the directory s3://amos--data--events/leads/enriched.csv"
)
preprocessor.data_path = leads_enriched_path
preprocessor.prerocessed_data_output_path = (
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
preprocessor = Preprocessing(
filter_null_data=False, historical_bool=False, S3_bool=S3_bool
)
preprocessor.preprocessed_df = pd.read_csv(leads_enriched_path)
preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)
df = preprocessor.implement_preprocessing_pipeline()
preprocessor.save_preprocessed_data()

Expand All @@ -333,9 +275,18 @@ def predict_MerchantSize_on_lead_data_demo():
historical_preprocessed_data = pd.read_csv(
"s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv"
)
toBePredicted_preprocessed_data = pd.read_csv(
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
)
if S3_bool:
toBePredicted_preprocessed_data = pd.read_csv(
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("preprocessed_data_files/leads_preprocessed_data.csv")
leads_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data = pd.read_csv(leads_preprocessed_data_path)

historical_columns_order = historical_preprocessed_data.columns

Expand All @@ -355,11 +306,21 @@ def predict_MerchantSize_on_lead_data_demo():
toBePredicted_preprocessed_data = toBePredicted_preprocessed_data[
historical_columns_order
]

toBePredicted_preprocessed_data.to_csv(
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv",
index=False,
)
if S3_bool:
toBePredicted_preprocessed_data.to_csv(
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv",
index=False,
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("toBePredicted_preprocessed_data_updated.csv")
local_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data.to_csv(
local_preprocessed_data_path, index=False
)

# check if columns in both dataframe are in same order and same number
assert list(toBePredicted_preprocessed_data.columns) == list(
Expand Down Expand Up @@ -403,9 +364,13 @@ def check_classification_task(string):
model = joblib.load(model_file)
log.info(f"Loaded the model sucessfully!")

data_path = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
if S3_bool:
data_path = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
else:
data_path = local_preprocessed_data_path

df = pd.read_csv(data_path)
input = df.drop("MerchantSizeByDPV", axis=1)
if xgb_bool:
Expand All @@ -418,15 +383,31 @@ def check_classification_task(string):
size_mapping = {0: "XS", 1: "S", 2: "M", 3: "L", 4: "XL"}
remapped_predictions = [size_mapping[prediction] for prediction in predictions]

enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv")
if S3_bool:
enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv")
else:
enriched_data = pd.read_csv(preprocessor.data_path)

# first 5 columns: Last Name,First Name,Company / Account,Phone,Email,
raw_data = enriched_data.iloc[:, :5]
print(f"raw_data = {raw_data.shape}")
print(f"remapped_predictions = {len(remapped_predictions)}")
raw_data["PredictedMerchantSize"] = remapped_predictions

raw_data.to_csv(
"s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv", index=True
)
log.info(
f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv"
)
if S3_bool:
raw_data.to_csv(
"s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv",
index=True,
)
log.info(
f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("predicted_MerchantSize_of_leads.csv")
output_path = "/".join(path_components)
raw_data.to_csv(output_path, index=True)
log.info(f"Saved the predicted Merchant Size of the leads at {output_path}")
74 changes: 47 additions & 27 deletions src/preprocessing/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,57 @@


class Preprocessing:
def __init__(self, filter_null_data=True, historical_data=False):
def __init__(self, filter_null_data=True, historical_bool=True, S3_bool=False):
data_repo = get_database()
self.data_path = data_repo.get_output_path()
self.preprocessed_df = None
self.prerocessed_data_output_path = None
# if historical_data:
# input_path_components = self.data_path.split(
# "\\" if "\\" in self.data_path else "/"
# )
# input_path_components.pop()
# input_path_components.pop()
# input_path_components.append("historical_data/100k_historic_enriched.csv")
# input_path = "/".join(input_path_components)
# data = pd.read_csv(input_path)
# log.debug(f"Data path = {input_path}")
# self.preprocessed_df = data.copy()
# else:
# log.debug(f"Data path = {self.data_path}")
# data = pd.read_csv(self.data_path)
# self.preprocessed_df = data.copy()

# if historical_data:
# self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv"
# else:
# # created the new output path based on which repo used
# path_components = self.data_path.split(
# "\\" if "\\" in self.data_path else "/"
# )
# path_components.pop()
# path_components.append("preprocessed_data_files/preprocessed_data.csv")
# self.prerocessed_data_output_path = "/".join(path_components)
if historical_bool and S3_bool:
self.data_path = (
"s3://amos--data--events/historical_data/100k_historic_enriched.csv"
)
self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/historical_preprocessed_data.csv"
elif historical_bool and not S3_bool:
# input path
input_path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
input_path_components.pop()
input_path_components.append("100k_historic_enriched.csv")
input_path = "/".join(input_path_components)
self.data_path = input_path

# output path
path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
path_components.pop()
path_components.append(
"preprocessed_data_files/historical_preprocessed_data.csv"
)
self.prerocessed_data_output_path = "/".join(path_components)
elif not historical_bool and S3_bool:
self.data_path = "s3://amos--data--events/leads/enriched.csv"
self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/leads_preprocessed_data.csv"
elif not historical_bool and not S3_bool:
# input path
input_path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
input_path_components.pop()
input_path_components.append("leads_enriched.csv")
input_path = "/".join(input_path_components)
self.data_path = input_path

# output path
path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
path_components.pop()
path_components.append(
"preprocessed_data_files/leads_preprocessed_data.csv"
)
self.prerocessed_data_output_path = "/".join(path_components)

self.filter_bool = filter_null_data
# columns that would be added later after one-hot encoding each class
Expand Down

0 comments on commit 93ba51c

Please sign in to comment.