Skip to content

Commit

Permalink
Merge pull request #236 from amosproj/bugfix/datanames-ahmed
Browse files Browse the repository at this point in the history
Prediction runs both locally and via S3
  • Loading branch information
ultiwinter authored Feb 5, 2024
2 parents 524b72e + c718b1b commit d842143
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 302 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ bin/
!**/data/merged_geo.geojson
**/data/reviews/*.json
**/data/gpt-results/*.json
**/data/models/*
**/data/models/*.pkl
**/data/models/*.joblib
**/data/classification_reports/*

**/docs/*
Expand Down
430 changes: 191 additions & 239 deletions Pipfile.lock

Large diffs are not rendered by default.

Empty file added src/data/models/.gitkeep
Empty file.
Empty file.
4 changes: 3 additions & 1 deletion src/database/leads/local_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report at {report_file_path}! Error: {str(e)}")

def load_preprocessed_data(self, file_name: str = "preprocessed_data.csv"):
def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
try:
return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name))
except FileNotFoundError:
Expand Down
4 changes: 3 additions & 1 deletion src/database/leads/s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report for '{model_name}' to S3: {str(e)}")

def load_preprocessed_data(self, file_name: str = "preprocessed_data.csv"):
def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
file_path = self.DF_PREPROCESSED_INPUT + file_name
if not file_path.startswith("s3://"):
log.error(
Expand Down
160 changes: 115 additions & 45 deletions src/demo/demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@


import re
import subprocess

import pandas as pd
import xgboost as xgb
from sklearn.metrics import classification_report

from bdc.pipeline import Pipeline
from config import DATABASE_TYPE
from database import get_database
from demo.console_utils import (
get_int_input,
Expand All @@ -40,6 +41,7 @@
INPUT_FILE_BDC = "../data/sumup_leads_email.csv"
OUTPUT_FILE_BDC = "../data/collected_data.json"


# evp demo
def evp_demo():
data = get_database().load_preprocessed_data()
Expand Down Expand Up @@ -212,26 +214,33 @@ def pipeline_demo():


def preprocessing_demo():
if get_yes_no_input("Filter out the API-irrelevant data? (y/n)"):
if get_yes_no_input("Filter out the API-irrelevant data? (y/n)\n"):
filter_bool = True
else:
filter_bool = False
if get_yes_no_input(
"Run on historical data ? (y/n)\nNote: DATABASE_TYPE should be S3!"
"Run on historical data ? (y/n)\n'n' means it will run on lead data!\n"
):
historical_bool = True
else:
historical_bool = False
if get_yes_no_input("Run on S3? (y/n)\n'n' means it will run locally!\n"):
S3_bool = True
else:
S3_bool = False

preprocessor = Preprocessing(
filter_null_data=filter_bool, historical_data=historical_bool
filter_null_data=filter_bool, historical_bool=historical_bool, S3_bool=S3_bool
)

preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)

df = preprocessor.implement_preprocessing_pipeline()
preprocessor.save_preprocessed_data()


def predict_MerchantSize_on_lead_data_demo():
import os
import pickle
import sys
from io import BytesIO

Expand All @@ -240,36 +249,42 @@ def predict_MerchantSize_on_lead_data_demo():
import pandas as pd

log.info(
"Note: Enriched data must be located at s3://amos--data--events/leads/enriched.csv"
"Note: In case of running locally, enriched data must be located at src/data/leads_enriched.csv\nIn case of running on S3, enriched data must be located at s3://amos--data--events/leads/enriched.csv"
)

######################### preprocessing the leads ##################################
S3_bool = DATABASE_TYPE == "S3"
current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
parent_dir = os.path.join(current_dir, "..")
sys.path.append(parent_dir)
from preprocessing import Preprocessing

preprocessor = Preprocessing(filter_null_data=False, historical_data=False)
leads_enriched_path = "s3://amos--data--events/leads/enriched.csv"
if not leads_enriched_path:
log.error(
"No such file exists in the directory s3://amos--data--events/leads/enriched.csv"
)
preprocessor.data_path = leads_enriched_path
preprocessor.prerocessed_data_output_path = (
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
log.info(f"Preprocessing the leads...")
preprocessor = Preprocessing(
filter_null_data=False, historical_bool=False, S3_bool=S3_bool
)
preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)
df = preprocessor.implement_preprocessing_pipeline()
preprocessor.save_preprocessed_data()

############################## adapting the preprocessing files ###########################
log.info(f"Adapting the leads' preprocessed data for the ML model...")
# load the data from the CSV files
historical_preprocessed_data = pd.read_csv(
"s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv"
)
toBePredicted_preprocessed_data = pd.read_csv(
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
)
if S3_bool:
toBePredicted_preprocessed_data = pd.read_csv(
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("preprocessed_data_files/leads_preprocessed_data.csv")
leads_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data = pd.read_csv(leads_preprocessed_data_path)

historical_columns_order = historical_preprocessed_data.columns

Expand All @@ -289,11 +304,30 @@ def predict_MerchantSize_on_lead_data_demo():
toBePredicted_preprocessed_data = toBePredicted_preprocessed_data[
historical_columns_order
]

toBePredicted_preprocessed_data.to_csv(
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv",
index=False,
)
if S3_bool:
toBePredicted_output_path_s3 = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
toBePredicted_preprocessed_data.to_csv(
toBePredicted_output_path_s3,
index=False,
)
log.info(
f"Saving the adapted preprocessed data at {toBePredicted_output_path_s3}"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("toBePredicted_preprocessed_data_updated.csv")
local_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data.to_csv(
local_preprocessed_data_path, index=False
)
log.info(
f"Saving the adapted preprocessed data at {local_preprocessed_data_path}"
)

# check if columns in both dataframe are in same order and same number
assert list(toBePredicted_preprocessed_data.columns) == list(
Expand All @@ -304,9 +338,14 @@ def predict_MerchantSize_on_lead_data_demo():

bucket_name = "amos--models"

model_name = get_string_input(
"Provide model file name in amos--models/models S3 Bucket\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n"
)
if S3_bool:
model_name = get_string_input(
"Provide model file name in amos--models/models S3 Bucket\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n"
)
else:
model_name = get_string_input(
"Provide model file name in data/models local directory\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n"
)
# file_key = "models/lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model_updated.pkl" # adjust according to the desired model
model_name = model_name.replace(" ", "")
xgb_bool = False
Expand All @@ -325,21 +364,38 @@ def check_classification_task(string):
False

classification_task_3 = check_classification_task(file_key)
# create an S3 client
s3 = boto3.client("s3")

# download the file from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)
model_content = response["Body"].read()
try:
if S3_bool:
# create an S3 client
s3 = boto3.client("s3")
# download the file from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)
model_content = response["Body"].read()
# load model
with BytesIO(model_content) as model_file:
model = joblib.load(model_file)
log.info(f"Loaded the model from S3 bucket sucessfully!")
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append(file_key)
model_local_path = "/".join(path_components)
model = joblib.load(model_local_path)
log.info(f"Loaded the model from the local path sucessfully!")
except:
log.error("No model found with the given name!")
return

# load model
with BytesIO(model_content) as model_file:
model = joblib.load(model_file)
log.info(f"Loaded the model sucessfully!")
if S3_bool:
data_path = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
else:
data_path = local_preprocessed_data_path

data_path = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
df = pd.read_csv(data_path)
input = df.drop("MerchantSizeByDPV", axis=1)
if xgb_bool:
Expand All @@ -352,15 +408,29 @@ def check_classification_task(string):
size_mapping = {0: "XS", 1: "S", 2: "M", 3: "L", 4: "XL"}
remapped_predictions = [size_mapping[prediction] for prediction in predictions]

enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv")
if S3_bool:
enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv")
else:
enriched_data = pd.read_csv(preprocessor.data_path)

# first 5 columns: Last Name,First Name,Company / Account,Phone,Email,
raw_data = enriched_data.iloc[:, :5]
raw_data["PredictedMerchantSize"] = remapped_predictions

raw_data.to_csv(
"s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv", index=True
)
log.info(
f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv"
)
if S3_bool:
raw_data.to_csv(
"s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv",
index=True,
)
log.info(
f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("predicted_MerchantSize_of_leads.csv")
output_path = "/".join(path_components)
raw_data.to_csv(output_path, index=True)
log.info(f"Saved the predicted Merchant Size of the leads at {output_path}")
56 changes: 41 additions & 15 deletions src/preprocessing/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,56 @@


class Preprocessing:
def __init__(self, filter_null_data=True, historical_data=False):
def __init__(self, filter_null_data=True, historical_bool=True, S3_bool=False):
data_repo = get_database()
self.data_path = data_repo.get_output_path()
if historical_data:
self.preprocessed_df = None
self.prerocessed_data_output_path = None
if historical_bool and S3_bool:
self.data_path = (
"s3://amos--data--events/historical_data/100k_historic_enriched.csv"
)
self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/historical_preprocessed_data.csv"
elif historical_bool and not S3_bool:
# input path
input_path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
input_path_components.pop()
input_path_components.append("100k_historic_enriched.csv")
input_path = "/".join(input_path_components)
self.data_path = input_path

# output path
path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
path_components.pop()
path_components.append(
"preprocessed_data_files/historical_preprocessed_data.csv"
)
self.prerocessed_data_output_path = "/".join(path_components)
elif not historical_bool and S3_bool:
self.data_path = "s3://amos--data--events/leads/enriched.csv"
self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/leads_preprocessed_data.csv"
elif not historical_bool and not S3_bool:
# input path
input_path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
input_path_components.pop()
input_path_components.append("historical_data/100k_historic_enriched.csv")
input_path_components.append("leads_enriched.csv")
input_path = "/".join(input_path_components)
data = pd.read_csv(input_path)
log.debug(f"Data path = {input_path}")
else:
log.debug(f"Data path = {self.data_path}")
data = pd.read_csv(self.data_path)
self.preprocessed_df = data.copy()
self.data_path = input_path

if historical_data:
self.prerocessed_data_output_path = "s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv"
else:
# created the new output path based on which repo used
# output path
path_components = self.data_path.split(
"\\" if "\\" in self.data_path else "/"
)
path_components.pop()
path_components.append("preprocessed_data.csv")
path_components.append(
"preprocessed_data_files/leads_preprocessed_data.csv"
)
self.prerocessed_data_output_path = "/".join(path_components)

self.filter_bool = filter_null_data
Expand Down Expand Up @@ -114,7 +137,10 @@ def filter_out_null_data(self):
]

def fill_missing_values(self, column, strategy="constant"):
if column in self.preprocessed_df.columns:
if (
column in self.preprocessed_df.columns
and not self.preprocessed_df[column].empty
):
imputer = SimpleImputer(strategy=strategy)
self.preprocessed_df[column] = imputer.fit_transform(
self.preprocessed_df[[column]]
Expand Down

0 comments on commit d842143

Please sign in to comment.