Skip to content

Commit

Permalink
Merge pull request #10 from enriquea/main
Browse files Browse the repository at this point in the history
added fs pipeline & code clean-up
  • Loading branch information
ypriverol authored Jun 26, 2024
2 parents 3d3e882 + 33634c4 commit 4592cd9
Show file tree
Hide file tree
Showing 14 changed files with 1,336 additions and 409 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ dmypy.json
local/
testscripts/
.idea/*
/benchmarking/
73 changes: 1 addition & 72 deletions docs/README.methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,75 +53,4 @@ A typical workflow written using `fsspark` can be divided roughly in four major

### 5. Feature selection pipeline example

Check it out [here](fsspark/pipeline/fs_corr_rf.py) a full FS pipeline example.

```python
"""
Example of a feature selection pipeline implemented in fsspark.
After data import and pre-processing, the pipeline applies univariate correlation filter,
multivariate correlation filter and Randon Forest classification.
"""

from fsspark.config.context import init_spark, stop_spark_session
from fsspark.fs.core import FSDataFrame
from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions
from fsspark.fs.multivariate import multivariate_filter
from fsspark.fs.univariate import univariate_filter
from fsspark.fs.utils import (remove_features_by_missingness_rate,
impute_missing)
from fsspark.utils.datasets import get_tnbc_data_path
from fsspark.utils.io import import_table_as_psdf

# Init spark
init_spark()

# Import data
fsdf = import_table_as_psdf(get_tnbc_data_path(),
n_partitions=5)

fsdf = FSDataFrame(fsdf, sample_col='Sample', label_col='label')

# Step 1. Data pre-processing.

# a) Filter missingness rate
fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1)

# b) Impute data frame
fsdf = impute_missing(fsdf)

# c) Scale features
fsdf = fsdf.scale_features(scaler_method='standard')

# Split dataset in training/testing
training_df, testing_df = fsdf.split_df(label_type_cat=True,
split_training_factor=0.8)

# Step 2. Apply univariate correlation filter
training_df = univariate_filter(training_df,
univariate_method='u_corr',
corr_threshold=0.3)

# Step 3. Apply multivariate correlation filter
training_df = multivariate_filter(training_df,
multivariate_method='m_corr',
corr_threshold=0.7
)

# Step 4. ML-algorithm with cross-validation
cv_model = cv_rf_classification(training_df,
binary_classification=False)

# Print out some stats

# Get accuracy from training
acc = get_accuracy(model=cv_model)
print(f"Training accuracy: {acc}")

# Get predictions from training
pred = get_predictions(model=cv_model)
pred.show()

stop_spark_session()
```
[FS pipeline example](../fsspark/pipeline/fs_pipeline_example.py)
8 changes: 5 additions & 3 deletions fsspark/config/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
PYARROW_SETTINGS,
PANDAS_ON_SPARK_API_SETTINGS)


os.environ['PYARROW_IGNORE_TIMEZONE'] = "1"


# os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home"
# os.environ['SPARK_HOME'] = "/usr/local/spark-3.3.0-bin-hadoop3"

def init_spark(apply_pyarrow_settings: bool = True,
def init_spark(master: str = "local[8]",
apply_pyarrow_settings: bool = True,
apply_extra_spark_settings: bool = True,
apply_pandas_settings: bool = True) -> SparkSession:
"""
Expand All @@ -24,7 +26,7 @@ def init_spark(apply_pyarrow_settings: bool = True,

# init or get spark session.
spark = (SparkSession.builder
.master("local[8]")
.master(master)
.appName("fs-spark")
)

Expand Down
53 changes: 53 additions & 0 deletions fsspark/fs/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Define constants for the project


# Define univariate feature selection methods constants
ANOVA = 'anova'
UNIVARIATE_CORRELATION = 'u_corr'
F_REGRESSION = 'f_regression'

# Define dict with univariate feature selection methods and brief description
UNIVARIATE_METHODS = {
ANOVA: 'ANOVA univariate feature selection (F-classification)',
UNIVARIATE_CORRELATION: 'Univariate Correlation',
F_REGRESSION: 'Univariate F-regression'
}

# Define multivariate feature selection methods constants
MULTIVARIATE_CORRELATION = 'm_corr'
MULTIVARIATE_VARIANCE = 'variance'

# Define dict with multivariate feature selection methods and brief description
MULTIVARIATE_METHODS = {
MULTIVARIATE_CORRELATION: 'Multivariate Correlation',
MULTIVARIATE_VARIANCE: 'Multivariate Variance'
}

# Define machine learning wrapper methods constants

# binary classification
RF_BINARY = 'rf_binary'
LSVC_BINARY = 'lsvc_binary'
FM_BINARY = 'fm_binary' # TODO: implement this method

# multilabel classification
RF_MULTILABEL = 'rf_multilabel'
LR_MULTILABEL = 'lg_multilabel' # TODO: implement this method

# regression
RF_REGRESSION = 'rf_regression'
FM_REGRESSION = 'fm_regression' # TODO: implement this method


# Define dict with machine learning wrapper methods and brief description
ML_METHODS = {
RF_BINARY: 'Random Forest Binary Classifier',
LSVC_BINARY: 'Linear SVC Binary Classifier',
FM_BINARY: 'Factorization Machine Binary Classifier',

RF_MULTILABEL: 'Random Forest Multi-label Classifier',
LR_MULTILABEL: 'Logistic Regression Multi-label Classifier',

RF_REGRESSION: 'Random Forest Regression',
FM_REGRESSION: 'Factorization Machine Regression'
}
20 changes: 12 additions & 8 deletions fsspark/fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,6 @@ def get_sample_label(self) -> list:
"""
return self.__indexed_instances.tolist()

# def get_samples(self) -> pyspark.pandas.Series:
# """
# Get samples identifiers from DataFrame. Coerce data type to string.
#
# :return: Pandas Series
# """
# return self.__df[self.__sample_col].astype("str")

def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.DataFrame:
"""
Return a Spark dataframe with feature columns assembled into a column vector (a.k.a. Dense Vector column).
Expand All @@ -204,6 +196,18 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.

return sdf_vector

def get_sdf_and_label(self,
output_column_vector: str = 'features') -> Tuple[pyspark.sql.dataframe.DataFrame, str, str]:
"""
Extracts the Spark DataFrame and label column name from FSDataFrame.
:param: output_column_vector: Name of the output column vector.
:return: A tuple containing the Spark DataFrame and the label column name.
"""
sdf = self.get_sdf_vector(output_column_vector=output_column_vector)
label_col = self.get_label_col_name()
return sdf, label_col, output_column_vector

def _collect_features_as_array(self) -> np.array:
"""
Collect features from FSDataFrame as an array.
Expand Down
Loading

0 comments on commit 4592cd9

Please sign in to comment.