-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updating Schema reading procedures and refactoring #78
Merged
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
faef729
Updated data reading procedure
Hsankesara 5ae2bae
refactoring
Hsankesara fba2d75
refactor to make all modules more spark dependent
Hsankesara 6f21791
Begin refactoring pipeline
Hsankesara a8b0582
refactor Spark Reader mechanism
Hsankesara 26fc86c
minor refactor in reader.py
Hsankesara 25dd237
Added and updated tests + upgraded spark version to 3.5.0
Hsankesara 979ae94
resolved linting errors
Hsankesara c0ebb67
minor refactoring of the data preprocessing function
Hsankesara 85d4a1c
minor error correction
Hsankesara e443cfc
updated setup.py
Hsankesara 38e8612
updated paramiko dependency
Hsankesara d10bbfb
updated modules version due to security vulnerabilities.
Hsankesara d762039
updated pandas and numpy versions
Hsankesara 9cedc21
updated test expected output files
Hsankesara f2d8faa
updated tests
Hsankesara a50a70e
solved test issue caused due of timezone setting in spark
Hsankesara 111de11
resolved error caused when data for an user is empty
Hsankesara 2d884cd
Added custom data reading module that can be used independently
Hsankesara 68eabe3
added more possible time columns to preprocess_time_data
Hsankesara 857ea89
Added user sampling mechanism in radarpipeline
Hsankesara ffd69ad
Added data sampling as well
Hsankesara bf47487
minor code changes
Hsankesara b4bc9d2
Added data_sampline methods by: time, count & fraction
Hsankesara 9659798
minor import changes
Hsankesara aa82e88
Merge pull request #80 from RADAR-base/simplifying_data_reader
Hsankesara c52281c
Added feature to access spark-cluster
Hsankesara 77bd070
Updated tests
Hsankesara a542b55
added tests to test different sampling configs
Hsankesara 6fc67c4
added pipeline tests for samplings
Hsankesara 8636032
changed data_type to source_type in config.yaml
Hsankesara 4ce5b82
Added multiple time ranges compatibility with data sampling
Hsankesara 0351724
Added tests for multiple time ranges
Hsankesara d420b75
Merge pull request #81 from RADAR-base/sampling_architecture
Hsankesara File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
from radarpipeline.io.abc import DataReader, SchemaReader | ||
from radarpipeline.io.reader import AvroSchemaReader, SparkCSVDataReader, Reader | ||
from radarpipeline.io.reader import AvroSchemaReader, Reader, SparkCSVDataReader | ||
from radarpipeline.io.downloader import SftpDataReader | ||
from radarpipeline.io.writer import * | ||
from radarpipeline.io.ingestion import CustomDataReader | ||
from radarpipeline.io.sampler import UserSampler, DataSampler |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import logging | ||
|
||
from radarpipeline.io.reader import Reader | ||
from radarpipeline.project.sparkengine import SparkEngine | ||
|
||
from typing import Dict | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CustomDataReader(): | ||
def __init__(self, input_config, variables, source_type="local", data_format="csv", | ||
df_type="pandas") -> None: | ||
self.variables = variables | ||
self.data_format = data_format | ||
self.source_type = source_type | ||
self.config = self.modify_config(input_config, data_format) | ||
self.sparkengine = SparkEngine() | ||
self.spark = self.sparkengine.initialize_spark_session() | ||
self.data_reader = Reader(self.spark, self.config, variables, df_type) | ||
|
||
def modify_config(self, input_config, data_format) -> Dict: | ||
""" | ||
Modify the input configuration to include the variables of interest | ||
""" | ||
config = {'input': {}, "configurations": {}} | ||
config['input'] = input_config | ||
config['input']['data_format'] = data_format | ||
config['input']['source_type'] = self.source_type | ||
config['configurations']['df_type'] = "pandas" | ||
config['configurations']['user_sampling'] = None | ||
config['configurations']['data_sampling'] = None | ||
return config | ||
|
||
def read_data(self): | ||
return self.data_reader.read_data() | ||
|
||
def close_session(self): | ||
self.sparkengine.close_spark_session() |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the distinction here between ingestion and reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing, I think ingestion is the filename. I tried to keep the function names which are exposed to the user simple and straight. That's why I named it that way.