From a9495c06af73096526015b2f16af7ac142ee3f6e Mon Sep 17 00:00:00 2001 From: Tian Luan Date: Sat, 17 Feb 2024 12:35:04 -0500 Subject: [PATCH] Implement CSVFileLoader to write data to csv files --- src/loader_interface/base_loader.py | 1 - src/loader_interface/csv_file_loader.py | 29 +++++++++++++ .../loader_interface/test_csv_file_loader.py | 41 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 src/loader_interface/csv_file_loader.py create mode 100644 tests/loader_interface/test_csv_file_loader.py diff --git a/src/loader_interface/base_loader.py b/src/loader_interface/base_loader.py index 0510ca1..0321a9b 100644 --- a/src/loader_interface/base_loader.py +++ b/src/loader_interface/base_loader.py @@ -30,6 +30,5 @@ async def run(self, data_generator: AsyncGenerator[Any, None]) -> None: """ self.logger.info("Starting data loading process.") async for data in data_generator: - print(self.write_data) await self.write_data(data) self.logger.info("Data loading process completed.") diff --git a/src/loader_interface/csv_file_loader.py b/src/loader_interface/csv_file_loader.py new file mode 100644 index 0000000..0749e7d --- /dev/null +++ b/src/loader_interface/csv_file_loader.py @@ -0,0 +1,29 @@ +from loader_interface.base_loader import BaseLoader +from pandas import DataFrame +from pathlib import Path + + +class CSVLoader(BaseLoader): + def __init__(self, destination_file: str) -> None: + """ + Initialize the CSVLoader object. + + Args: + destination_file (str): The path to the destination CSV file. + """ + super().__init__() + self.destination_file = destination_file + self.initial_write = not Path(self.destination_file).exists() + + async def write_data(self, data: DataFrame) -> None: + """ + Writes the given data to a CSV file. + + Args: + data (DataFrame): The data to be written. Expected to be a pandas DataFrame. + """ + # Write DataFrame to CSV file + data.to_csv(self.destination_file, mode='a', header=self.initial_write, index=False) + + # After the first write, set initial_write to False so headers are not repeated + self.initial_write = False diff --git a/tests/loader_interface/test_csv_file_loader.py b/tests/loader_interface/test_csv_file_loader.py new file mode 100644 index 0000000..b34fdca --- /dev/null +++ b/tests/loader_interface/test_csv_file_loader.py @@ -0,0 +1,41 @@ +import pytest +import pandas as pd +from pandas.testing import assert_frame_equal +from loader_interface.csv_file_loader import CSVLoader +import asyncio + +@pytest.mark.asyncio +async def test_csv_loader(tmp_path): + # Create a temp path + destination_file = tmp_path / "output.csv" + + # Create DataFrame used for tests + df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]}) + df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]}) + + loader = CSVLoader(str(destination_file)) + + # Write the first batch of data + await loader.write_data(df1) + + # Verify if the first write successful + loaded_df1 = pd.read_csv(destination_file) + assert_frame_equal(loaded_df1, df1, check_dtype=False) + + # Write the second batch of data + await loader.write_data(df2) + + # Verify if the second write successful + loaded_df2 = pd.read_csv(destination_file) + combined_df = pd.concat([df1, df2], ignore_index=True) + assert_frame_equal(loaded_df2, combined_df, check_dtype=False) + +# Clean files +@pytest.fixture(autouse=True) +def run_around_tests(): + # Do nothing before test + yield + # Execute cleanup + loop = asyncio.get_event_loop() + loop.run_until_complete(asyncio.sleep(0.1)) + # Delete anything here if need