diff --git a/Makefile b/Makefile index 6736e64078..1598664f83 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast + cd ${ROOT_DIR}/sdk/python; python -m mypy feast cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests diff --git a/sdk/python/feast/infra/contrib/stream_processor.py b/sdk/python/feast/infra/contrib/stream_processor.py index c4620f4ca1..3f1fe08510 100644 --- a/sdk/python/feast/infra/contrib/stream_processor.py +++ b/sdk/python/feast/infra/contrib/stream_processor.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from types import MethodType -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Optional from pyspark.sql import DataFrame from typing_extensions import TypeAlias @@ -51,7 +51,9 @@ def __init__( self.data_source = data_source @abstractmethod - def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: + def ingest_stream_feature_view( + self, to: PushMode = PushMode.ONLINE + ) -> Optional[Any]: """ Ingests data from the stream source attached to the stream feature view; transforms the data and then persists it to the online store and/or offline store, depending on the 'to' parameter. @@ -75,7 +77,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: raise NotImplementedError @abstractmethod - def _write_stream_data(self, table: StreamTable, to: PushMode) -> None: + def _write_stream_data(self, table: StreamTable, to: PushMode) -> Optional[Any]: """ Launches a job to persist stream data to the online store and/or offline store, depending on the 'to' parameter, and returns a handle for the job. diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py index 46d5c20e97..f50cdc4c41 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Optional +from typing import Dict, Literal, Optional import pandas as pd import pytest @@ -12,6 +12,7 @@ PostgreSQLSource, ) from feast.infra.utils.postgres.connection_utils import df_to_postgres_table +from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -26,6 +27,10 @@ POSTGRES_DB = "test" +class PostgreSQLOnlineStoreConfig(PostgreSQLConfig): + type: Literal["postgres"] = "postgres" + + @pytest.fixture(scope="session") def postgres_container(): container = ( @@ -106,17 +111,17 @@ def create_offline_store_config(self) -> PostgreSQLOfflineStoreConfig: def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.project_name}_{suffix}" - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> PostgreSQLOnlineStoreConfig: assert self.container - return { - "type": "postgres", - "host": "localhost", - "port": self.container.get_exposed_port(5432), - "database": POSTGRES_DB, - "db_schema": "feature_store", - "user": POSTGRES_USER, - "password": POSTGRES_PASSWORD, - } + return PostgreSQLOnlineStoreConfig( + type="postgres", + host="localhost", + port=self.container.get_exposed_port(5432), + database=POSTGRES_DB, + db_schema="feature_store", + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + ) def create_saved_dataset_destination(self): # FIXME: ... diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 66e7e78651..32cda2d6b6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -470,7 +470,7 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: pa_table = execute_snowflake_statement( self.snowflake_conn, self.to_sql() - ).fetch_arrow_all() + ).fetch_arrow_all(force_return_table=False) if pa_table: return pa_table