diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py index 66fede5bb9..ed8f5e8be8 100644 --- a/evadb/storage/native_storage_engine.py +++ b/evadb/storage/native_storage_engine.py @@ -28,7 +28,7 @@ from evadb.models.storage.batch import Batch from evadb.storage.abstract_storage_engine import AbstractStorageEngine from evadb.third_party.databases.interface import get_database_handler -from evadb.utils.generic_utils import PickleSerializer +from evadb.utils.generic_utils import PickleSerializer, rebatch from evadb.utils.logging_manager import logger @@ -190,8 +190,8 @@ def read( _deserialize_sql_row(row, ordered_columns) for row in result ) - for data_batch in result: - yield Batch(pd.DataFrame([data_batch])) + for df in rebatch(result, batch_mem_size): + yield Batch(pd.DataFrame(df)) except Exception as e: err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}" diff --git a/evadb/storage/sqlite_storage_engine.py b/evadb/storage/sqlite_storage_engine.py index 91b72bb443..2c3335f562 100644 --- a/evadb/storage/sqlite_storage_engine.py +++ b/evadb/storage/sqlite_storage_engine.py @@ -29,7 +29,7 @@ from evadb.models.storage.batch import Batch from evadb.parser.table_ref import TableInfo from evadb.storage.abstract_storage_engine import AbstractStorageEngine -from evadb.utils.generic_utils import PickleSerializer +from evadb.utils.generic_utils import PickleSerializer, rebatch from evadb.utils.logging_manager import logger # Leveraging Dynamic schema in SQLAlchemy @@ -189,12 +189,12 @@ def read( try: table_to_read = self._try_loading_table_via_reflection(table.name) result = self._sql_session.execute(table_to_read.select()).fetchall() - for row in result: - yield Batch( - pd.DataFrame( - [self._deserialize_sql_row(row._asdict(), table.columns)] - ) - ) + result_iter = ( + self._deserialize_sql_row(row._asdict(), table.columns) + for row in result + ) + for df in rebatch(result_iter, batch_mem_size): + yield Batch(pd.DataFrame(df)) except Exception as e: err_msg = f"Failed to read the table {table.name} with exception {str(e)}" logger.exception(err_msg)