Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating Schema reading procedures and refactoring #78

Merged
merged 34 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
faef729
Updated data reading procedure
Hsankesara Jan 10, 2024
5ae2bae
refactoring
Hsankesara Jan 11, 2024
fba2d75
refactor to make all modules more spark dependent
Hsankesara Jan 15, 2024
6f21791
Begin refactoring pipeline
Hsankesara Jan 16, 2024
a8b0582
refactor Spark Reader mechanism
Hsankesara Jan 16, 2024
26fc86c
minor refactor in reader.py
Hsankesara Jan 16, 2024
25dd237
Added and updated tests + upgraded spark version to 3.5.0
Hsankesara Jan 17, 2024
979ae94
resolved linting errors
Hsankesara Jan 17, 2024
c0ebb67
minor refactoring of the data preprocessing function
Hsankesara Jan 17, 2024
85d4a1c
minor error correction
Hsankesara Jan 17, 2024
e443cfc
updated setup.py
Hsankesara Jan 17, 2024
38e8612
updated paramiko dependency
Hsankesara Jan 17, 2024
d10bbfb
updated modules version due to security vulnerabilities.
Hsankesara Jan 17, 2024
d762039
updated pandas and numpy versions
Hsankesara Jan 17, 2024
9cedc21
updated test expected output files
Hsankesara Jan 22, 2024
f2d8faa
updated tests
Hsankesara Jan 22, 2024
a50a70e
solved test issue caused due of timezone setting in spark
Hsankesara Jan 24, 2024
111de11
resolved error caused when data for an user is empty
Hsankesara Jan 25, 2024
2d884cd
Added custom data reading module that can be used independently
Hsankesara Feb 8, 2024
68eabe3
added more possible time columns to preprocess_time_data
Hsankesara Feb 8, 2024
857ea89
Added user sampling mechanism in radarpipeline
Hsankesara Feb 9, 2024
ffd69ad
Added data sampling as well
Hsankesara Feb 9, 2024
bf47487
minor code changes
Hsankesara Feb 12, 2024
b4bc9d2
Added data_sampline methods by: time, count & fraction
Hsankesara Feb 14, 2024
9659798
minor import changes
Hsankesara Feb 14, 2024
aa82e88
Merge pull request #80 from RADAR-base/simplifying_data_reader
Hsankesara Feb 23, 2024
c52281c
Added feature to access spark-cluster
Hsankesara Apr 29, 2024
77bd070
Updated tests
Hsankesara Apr 30, 2024
a542b55
added tests to test different sampling configs
Hsankesara Apr 30, 2024
6fc67c4
added pipeline tests for samplings
Hsankesara May 1, 2024
8636032
changed data_type to source_type in config.yaml
Hsankesara Jun 3, 2024
4ce5b82
Added multiple time ranges compatibility with data sampling
Hsankesara Jun 4, 2024
0351724
Added tests for multiple time ranges
Hsankesara Jun 4, 2024
d420b75
Merge pull request #81 from RADAR-base/sampling_architecture
Hsankesara Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ project:
version: mock_version

input:
data_type: local # couldbe mock, local, sftp, s3
source_type: mock # couldbe mock, local, sftp, s3
config:
# In case of sftp, use the following format
# sftp_host:
Expand All @@ -25,6 +25,36 @@ input:

configurations:
df_type: 'pandas'
#user_sampling:
## Possible methods: fraction, count, userid
# method: fraction
# config:
# fraction: 0.8
#method: count
#config:
# count: 2
#method: userid
#config:
# userids:
# - 2a02e53a-951e-4fd0-b47f-195a87096bd0
## TODO: For future
#data_sampling:
## Possible methods: time, count, fraction
## starttime and endtime format is dd-mm-yyyy hh:mm:ss in UTC timezone
## It is possible to have multiple time ranges. See below Example
#method: time
#config:
#- starttime: 2018-11-22 00:00:00
# endtime: 2018-11-26 00:00:00
# time_column: value.time
#- starttime: 2018-12-27 00:00:00
# time_column: value.time
#method: count
#config:
# count: 100
#method: fraction
#config:
# fraction: 0.3

features:
- location: 'https://github.com/RADAR-base-Analytics/mockfeatures'
Expand All @@ -39,8 +69,4 @@ output:
config:
target_path: output/mockdata
data_format: csv
compress: false

spark_config:
spark.executor.instances: 2
spark.driver.memory: 13G
compress: false
34 changes: 32 additions & 2 deletions config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ project:
version: mock_version

input:
data_type: mock # couldbe mock, local, sftp, s3
source_type: mock # couldbe mock, local, sftp, s3
config:
## In case of sftp, use the following format
# sftp_host:
Expand All @@ -24,7 +24,37 @@ input:
data_format: csv

configurations:
df_type: 'pandas' # can be pandas or spark
df_type: 'pandas'
#user_sampling:
## Possible methods: fraction, count, userid
# method: fraction
# config:
# fraction: 0.8
#method: count
#config:
# count: 2
#method: userid
#config:
# userids:
# - 2a02e53a-951e-4fd0-b47f-195a87096bd0
## TODO: For future
data_sampling:
## Possible methods: time, count, fraction
## starttime and endtime format is dd-mm-yyyy hh:mm:ss in UTC timezone
## It is possible to have multiple time ranges. See below Example
#method: time
#config:
#- starttime: 2018-11-22 00:00:00
# endtime: 2018-11-26 00:00:00
# time_column: value.time
#- starttime: 2018-12-27 00:00:00
# time_column: value.time
#method: count
#config:
# count: 100
#method: fraction
#config:
# fraction: 0.3

features:
- location: 'https://github.com/RADAR-base-Analytics/mockfeatures'
Expand Down
2 changes: 1 addition & 1 deletion mockdata
Submodule mockdata updated 2 files
+0 −0 __init__.py
+20 −0 setup.py
69 changes: 67 additions & 2 deletions radarpipeline/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
import yaml
from strictyaml import load, Map, Int, Str, Seq, Bool, Optional
from strictyaml import YAMLError, CommaSeparated, MapPattern
from dateutil import parser

import ntpath
import posixpath

from radarpipeline.common import constants
import unittest
from radarpipeline.project.sparkengine import SparkEngine
import pyspark.sql.functions as f
from pyspark.sql.types import TimestampType


def read_yaml(yaml_file_path: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -145,12 +150,20 @@ def get_yaml_schema() -> Map:
Optional("version"): Str()
}),
"input": Map({
"data_type": Str(),
"source_type": Str(),
"config": MapPattern(Str(), Str()),
"data_format": Str()
}),
"configurations": Map({
"df_type": Str()
"df_type": Str(),
Optional("user_sampling"): Map({
"method": Str(),
"config": MapPattern(Str(), Seq(Str()) | Str()),
}),
Optional("data_sampling"): Map({
"method": Str(),
"config": MapPattern(Str(), Str()) | Seq(MapPattern(Str(), Str())),
}),
}),
"features": Seq(Map({
"location": Str(),
Expand All @@ -165,6 +178,7 @@ def get_yaml_schema() -> Map:
"compress": Bool()
}),
Optional("spark_config"): Map({
Optional("spark_master", default="local"): Str(),
Optional("spark.executor.instances", default=4): Int(),
Optional("spark.executor.cores", default=4): Int(),
Optional("spark.executor.memory", default='10g'): Str(),
Expand Down Expand Up @@ -250,3 +264,54 @@ def get_write_file_attr(feature_name, output_dir, data_format, compression):
raise ValueError(f"Invalid data format {data_format} specified \
for spark writer")
return file_path


def get_hash(array : List) -> int:
"""
Returns the hash of the array

Parameters
----------
array : list
List of values

Returns
-------
str
Hash of the array
"""
return hash(tuple(array))


def preprocess_time_data(data):
time_cols = ["value.time", "value.timeReceived", "value.dateTime",
"value.timeCompleted", "value.timeNotification"]
for i, col in enumerate(time_cols):
if col in data.columns:
data = data.withColumn(col, data[f"`{col}`"].cast(TimestampType()))
data.withColumn(col, f.from_unixtime(
f.unix_timestamp(f"`{col}`")))
return data


def convert_str_to_time(time):
try:
return parser.parse(time)
except ValueError:
raise ValueError(
"Invalid value for the key: time. It should be a valid time format"
)


class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark_engine = SparkEngine()
cls.spark = cls.spark_engine.initialize_spark_session()

@classmethod
def tearDownClass(cls):
cls.spark_engine.close_spark_session()

def preprocess_data(self, data):
return preprocess_time_data(data)
7 changes: 3 additions & 4 deletions radarpipeline/datalib/radar_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ def get_combined_data_by_variable(
# Combine the all data for each variable
for var in variable_dict:
if len(variable_dict[var]) > 0:
if self.df_type == "spark":
combined_df = utils.combine_pyspark_dfs(variable_dict[var])
else:
combined_df = pd.concat(variable_dict[var], ignore_index=True)
combined_df = utils.combine_pyspark_dfs(variable_dict[var])
if self.df_type == "pandas":
combined_df = combined_df.toPandas()
variable_data_list.append(combined_df)

if is_only_one_var:
Expand Down
46 changes: 11 additions & 35 deletions radarpipeline/datalib/radar_variable_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import logging
import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql.types import TimestampType

from radarpipeline.datalib.abc import Data
from radarpipeline.common.utils import preprocess_time_data
from radarpipeline.datatypes import DataType

logger = logging.getLogger(__name__)
Expand All @@ -16,10 +18,13 @@ class RadarVariableData(Data):

_data: DataType

def __init__(self, data: DataType, df_type: str = "pandas") -> None:
def __init__(self, data: DataType, df_type: str = "pandas",
data_sampler=None) -> None:
self._data = data
self.df_type = df_type
self._preprocess_data()
if data_sampler is not None:
self._data = data_sampler.sample_data(self._data)

def get_data(self) -> DataType:
return self._data
Expand All @@ -31,42 +36,13 @@ def get_data_keys(self) -> List[str]:
return list(self._data.columns)

def get_data_size(self) -> int:
if self.df_type == "pandas":
return len(self._data.index)
else:
return int(self._data.count())
return int(self._data.count())

def _preprocess_data(self) -> None:
"""
Converts all time value columns to datetime format
"""

if self.df_type == "spark":
if "value.time" in self.get_data_keys():
self._data = self._data.withColumn(
"value.time", f.to_date(self._data["`value.time`"])
)
if "value.timeReceived" in self.get_data_keys():
self._data = self._data.withColumn(
"value.timeReceived", f.to_date(self._data["`value.timeReceived`"])
)
if "value.dateTime" in self.get_data_keys():
self._data = self._data.withColumn(
"value.dateTime", f.to_date(self._data["`value.dateTime`"])
)
else:
try:
if "value.time" in self.get_data_keys():
self._data["value.time"] = pd.to_datetime(
self._data["value.time"].astype(str), unit="s"
)
if "value.timeReceived" in self.get_data_keys():
self._data["value.timeReceived"] = pd.to_datetime(
self._data["value.timeReceived"].astype(str), unit="s"
)
if "value.dateTime" in self.get_data_keys():
self._data["value.dateTime"] = pd.to_datetime(
self._data["value.dateTime"].astype(str), unit="s"
)
except ValueError:
logger.warning("Unable to convert time columns to datetime format")
try:
self._data = preprocess_time_data(self._data)
except ValueError:
logger.warning("Unable to convert time columns to datetime format")
2 changes: 1 addition & 1 deletion radarpipeline/features/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_all_features(self, data: RadarData) -> Tuple[List[str], List[DataType]]:
feature_values = []
preprocessed_data = self.preprocess(data)
for feature in self.features:
print(feature.name)
logger.info(f"Computing feature {feature.name}")
feature_names.append(feature.name)
preprocessed_feature = feature.preprocess(preprocessed_data)
feature_values.append(feature.calculate(preprocessed_feature))
Expand Down
4 changes: 3 additions & 1 deletion radarpipeline/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from radarpipeline.io.abc import DataReader, SchemaReader
from radarpipeline.io.reader import AvroSchemaReader, SparkCSVDataReader, Reader
from radarpipeline.io.reader import AvroSchemaReader, Reader, SparkCSVDataReader
from radarpipeline.io.downloader import SftpDataReader
from radarpipeline.io.writer import *
from radarpipeline.io.ingestion import CustomDataReader
from radarpipeline.io.sampler import UserSampler, DataSampler
9 changes: 9 additions & 0 deletions radarpipeline/io/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,12 @@ def __init__(self, features: Dict[str, DataType], output_dir: str) -> None:
@abstractmethod
def write_data(self) -> None:
pass


class Sampler(ABC):
"""
Abstract class for sampling the RADAR data
"""

def __init__(self, config) -> None:
self.config = config
7 changes: 4 additions & 3 deletions radarpipeline/io/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ def _fetch_data(self, root_path, sftp_source_path, included_var_cat, uid):
src_file),
preserve_mtime=True)
except FileNotFoundError:
print("Folder not found: " + dir_path + "/" + src_file)
logger.warning("Folder not found: " + dir_path
+ "/" + src_file)
continue
except EOFError:
print("EOFError: " + dir_path + "/" + src_file)
logger.warning("EOFError: " + dir_path + "/" + src_file)
continue
except FileNotFoundError:
print("Folder not found: " + uid)
logger.warning("Folder not found: " + uid)
return
sftp.close()

Expand Down
39 changes: 39 additions & 0 deletions radarpipeline/io/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging

from radarpipeline.io.reader import Reader
from radarpipeline.project.sparkengine import SparkEngine

from typing import Dict

logger = logging.getLogger(__name__)


class CustomDataReader():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the distinction here between ingestion and reader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, I think ingestion is the filename. I tried to keep the function names which are exposed to the user simple and straight. That's why I named it that way.

def __init__(self, input_config, variables, source_type="local", data_format="csv",
df_type="pandas") -> None:
self.variables = variables
self.data_format = data_format
self.source_type = source_type
self.config = self.modify_config(input_config, data_format)
self.sparkengine = SparkEngine()
self.spark = self.sparkengine.initialize_spark_session()
self.data_reader = Reader(self.spark, self.config, variables, df_type)

def modify_config(self, input_config, data_format) -> Dict:
"""
Modify the input configuration to include the variables of interest
"""
config = {'input': {}, "configurations": {}}
config['input'] = input_config
config['input']['data_format'] = data_format
config['input']['source_type'] = self.source_type
config['configurations']['df_type'] = "pandas"
config['configurations']['user_sampling'] = None
config['configurations']['data_sampling'] = None
return config

def read_data(self):
return self.data_reader.read_data()

def close_session(self):
self.sparkengine.close_spark_session()
Loading
Loading