From 565332c88f561e1952a33159b71c4db4dfc9a93e Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 18 Aug 2021 14:12:17 -0700 Subject: [PATCH 1/8] Add S3 support with custom endpoints Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/offline_stores/file.py | 11 +++- .../feast/infra/offline_stores/file_source.py | 59 ++++++++++++++++++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 4461ef842b9..a04357bf941 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -112,7 +112,11 @@ def evaluate_historical_retrieval(): ) # Read offline parquet data in pyarrow format. - table = pyarrow.parquet.read_table(feature_view.batch_source.path) + filesystem, path = FileSource.prepare_path( + feature_view.batch_source.path, + feature_view.batch_source.file_options.s3_endpoint_override, + ) + table = pyarrow.parquet.read_table(path, filesystem=filesystem) # Rename columns by the field mapping dictionary if it exists if feature_view.batch_source.field_mapping is not None: @@ -238,7 +242,10 @@ def pull_latest_from_table_or_query( # Create lazy function that is only called from the RetrievalJob object def evaluate_offline_job(): - source_df = pd.read_parquet(data_source.path) + filesystem, path = FileSource.prepare_path( + data_source.path, data_source.file_options.s3_endpoint_override + ) + source_df = pd.read_parquet(path, filesystem=filesystem) # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 624c812b235..91aac446f1b 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -1,5 +1,6 @@ from typing import Callable, Dict, Iterable, Optional, Tuple +from pyarrow import fs from pyarrow.parquet import ParquetFile from feast import type_map @@ -20,6 +21,7 @@ def __init__( created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", + s3_endpoint_override: Optional[str] = None, ): """Create a FileSource from a file containing feature data. Only Parquet format supported. @@ -33,6 +35,7 @@ def __init__( file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. + s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage Examples: >>> from feast import FileSource @@ -51,7 +54,11 @@ def __init__( else: file_url = path - self._file_options = FileOptions(file_format=file_format, file_url=file_url) + self._file_options = FileOptions( + file_format=file_format, + file_url=file_url, + s3_endpoint_override=s3_endpoint_override, + ) super().__init__( event_timestamp_column, @@ -70,6 +77,8 @@ def __eq__(self, other): and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping + and self.file_options.s3_endpoint_override + == other.file_options.s3_endpoint_override ) @property @@ -102,6 +111,7 @@ def from_proto(data_source: DataSourceProto): event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, + s3_endpoint_override=data_source.file_options.s3_endpoint_override, ) def to_proto(self) -> DataSourceProto: @@ -128,9 +138,24 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - schema = ParquetFile(self.path).schema_arrow + filesystem, path = FileSource.prepare_path( + self.path, self._file_options.s3_endpoint_override + ) + schema = ParquetFile( + path if filesystem is None else filesystem.open_input_file(path) + ).schema_arrow return zip(schema.names, map(str, schema.types)) + @staticmethod + def prepare_path(path: str, s3_endpoint_override: str): + if path.startswith("s3://"): + s3 = fs.S3FileSystem( + endpoint_override=s3_endpoint_override if s3_endpoint_override else None + ) + return s3, path.replace("s3://", "") + else: + return None, path + class FileOptions: """ @@ -138,10 +163,22 @@ class FileOptions: """ def __init__( - self, file_format: Optional[FileFormat], file_url: Optional[str], + self, + file_format: Optional[FileFormat], + file_url: Optional[str], + s3_endpoint_override: Optional[str], ): + """ + FileOptions initialization method + + Args: + file_format (FileFormat, optional): file source format eg. parquet + file_url (str, optional): file source url eg. s3:// or local file + s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 file_url) + """ self._file_format = file_format self._file_url = file_url + self._s3_endpoint_override = s3_endpoint_override @property def file_format(self): @@ -171,6 +208,20 @@ def file_url(self, file_url): """ self._file_url = file_url + @property + def s3_endpoint_override(self): + """ + Returns the s3 endpoint override + """ + return None if self._s3_endpoint_override == "" else self._s3_endpoint_override + + @s3_endpoint_override.setter + def s3_endpoint_override(self, s3_endpoint_override): + """ + Sets the s3 endpoint override + """ + self._s3_endpoint_override = s3_endpoint_override + @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ @@ -185,6 +236,7 @@ def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): file_options = cls( file_format=FileFormat.from_proto(file_options_proto.file_format), file_url=file_options_proto.file_url, + s3_endpoint_override=file_options_proto.s3_endpoint_override, ) return file_options @@ -201,6 +253,7 @@ def to_proto(self) -> DataSourceProto.FileOptions: None if self.file_format is None else self.file_format.to_proto() ), file_url=self.file_url, + s3_endpoint_override=self.s3_endpoint_override, ) return file_options_proto From 0fc36ef9304284f328cd68aa3c74d9182cd50bb8 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 18 Aug 2021 14:12:32 -0700 Subject: [PATCH 2/8] Add tests for S3 support Signed-off-by: Willem Pienaar --- protos/feast/core/DataSource.proto | 3 + sdk/python/setup.py | 2 + .../feature_repos/test_repo_configuration.py | 8 +- .../universal/data_source_creator.py | 2 +- .../universal/data_sources/bigquery.py | 2 +- .../universal/data_sources/file.py | 81 ++++++++++++++++++- .../universal/data_sources/redshift.py | 2 +- .../feature_repos/universal/feature_views.py | 5 -- .../offline_store/test_s3_custom_endpoint.py | 29 +++++++ 9 files changed, 121 insertions(+), 13 deletions(-) create mode 100644 sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 099ba32d929..2e43ac9a233 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -69,6 +69,9 @@ message DataSource { // gs://path/to/file for GCP GCS storage // file:///path/to/file for local storage string file_url = 2; + + // override AWS S3 storage endpoint with custom S3 endpoint + string s3_endpoint_override = 3; } // Defines options for DataSource that sources features from a BigQuery Query diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 1ba788125f4..0ed6438dc5e 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -82,6 +82,7 @@ "isort>=5", "grpcio-tools==1.34.0", "grpcio-testing==1.34.0", + "minio==7.1.0", "mock==2.0.0", "moto", "mypy==0.790", @@ -99,6 +100,7 @@ "pytest-mock==1.10.4", "Sphinx!=4.0.0", "sphinx-rtd-theme", + "testcontainers==3.4.2", "adlfs==0.5.9", "firebase-admin==4.5.2", "pre-commit", diff --git a/sdk/python/tests/integration/feature_repos/test_repo_configuration.py b/sdk/python/tests/integration/feature_repos/test_repo_configuration.py index f912f4dbdbd..463cfc5c5dd 100644 --- a/sdk/python/tests/integration/feature_repos/test_repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/test_repo_configuration.py @@ -115,7 +115,7 @@ def customer_feature_view(self) -> FeatureView: customer_table_id = self.data_source_creator.get_prefixed_table_name( self.name, "customer_profile" ) - ds = self.data_source_creator.create_data_sources( + ds = self.data_source_creator.create_data_source( customer_table_id, self.customer_df, event_timestamp_column="event_timestamp", @@ -129,7 +129,7 @@ def driver_stats_feature_view(self) -> FeatureView: driver_table_id = self.data_source_creator.get_prefixed_table_name( self.name, "driver_hourly" ) - ds = self.data_source_creator.create_data_sources( + ds = self.data_source_creator.create_data_source( driver_table_id, self.driver_df, event_timestamp_column="event_timestamp", @@ -145,7 +145,7 @@ def orders_table(self) -> Optional[str]: orders_table_id = self.data_source_creator.get_prefixed_table_name( self.name, "orders" ) - ds = self.data_source_creator.create_data_sources( + ds = self.data_source_creator.create_data_source( orders_table_id, self.orders_df, event_timestamp_column="event_timestamp", @@ -221,7 +221,7 @@ def construct_test_environment( offline_creator: DataSourceCreator = importer.get_class_from_type( module_name, config_class_name, "DataSourceCreator" )(project) - ds = offline_creator.create_data_sources( + ds = offline_creator.create_data_source( project, df, field_mapping={"ts_1": "ts", "id": "driver_id"} ) offline_store = offline_creator.create_offline_store_config() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index fa5293c06d8..42667a983df 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -9,7 +9,7 @@ class DataSourceCreator(ABC): @abstractmethod - def create_data_sources( + def create_data_source( self, destination: str, df: pd.DataFrame, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 74804e75121..07e65f96605 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -40,7 +40,7 @@ def teardown(self): def create_offline_store_config(self): return BigQueryOfflineStoreConfig() - def create_data_sources( + def create_data_source( self, destination: str, df: pd.DataFrame, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index b386c399dc7..ace3b7ccbd2 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -2,6 +2,9 @@ from typing import Any, Dict import pandas as pd +from minio import Minio +from testcontainers.core.generic import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs from feast import FileSource from feast.data_format import ParquetFormat @@ -19,7 +22,7 @@ class FileDataSourceCreator(DataSourceCreator): def __init__(self, _: str): pass - def create_data_sources( + def create_data_source( self, destination: str, df: pd.DataFrame, @@ -46,3 +49,79 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def teardown(self): self.f.close() + + +class S3FileDataSourceCreator(DataSourceCreator): + f: Any + minio: DockerContainer + bucket = "feast-test" + access_key = "AKIAIOSFODNN7EXAMPLE" + secret = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + + def __init__(self, _: str): + self._setup_minio() + + def _setup_minio(self): + minio = DockerContainer("minio/minio:RELEASE.2021-08-17T20-53-08Z") + self.minio = minio + minio.with_exposed_ports(9000).with_exposed_ports(9001).with_env( + "MINIO_ROOT_USER", self.access_key + ).with_env("MINIO_ROOT_PASSWORD", self.secret).with_command( + 'server /data --console-address ":9001"' + ) + minio.start() + log_string_to_wait_for = ( + "API" # The minio container will print "API: ..." when ready. + ) + wait_for_logs(container=minio, predicate=log_string_to_wait_for, timeout=5) + + def _upload_parquet_file(self, df, file_name, minio_endpoint): + self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) + df.to_parquet(self.f.name) + + client = Minio( + minio_endpoint, + access_key=self.access_key, + secret_key=self.secret, + secure=False, + ) + if not client.bucket_exists(self.bucket): + client.make_bucket(self.bucket) + client.fput_object( + self.bucket, file_name, self.f.name, + ) + + def create_data_source( + self, + destination: str, + df: pd.DataFrame, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + ) -> DataSource: + filename = f"{destination}.parquet" + port = self.minio.get_exposed_port("9000") + host = self.minio.get_container_host_ip() + minio_endpoint = f"{host}:{port}" + + self._upload_parquet_file(df, filename, minio_endpoint) + + return FileSource( + file_format=ParquetFormat(), + path=f"s3://{self.bucket}/{filename}", + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + field_mapping=field_mapping or {"ts_1": "ts"}, + s3_endpoint_override=f"http://{host}:{port}", + ) + + def get_prefixed_table_name(self, name: str, suffix: str) -> str: + return f"{suffix}" + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return FileOfflineStoreConfig() + + def teardown(self): + self.minio.stop() + self.f.close() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index f731b60bb39..997bedaf26e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -31,7 +31,7 @@ def __init__(self, project_name: str): iam_role="arn:aws:iam::402087665549:role/redshift_s3_access_role", ) - def create_data_sources( + def create_data_source( self, destination: str, df: pd.DataFrame, diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 0306044ecdc..b6e5331eb96 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -20,11 +20,6 @@ def create_driver_hourly_stats_feature_view(source): driver_stats_feature_view = FeatureView( name="driver_stats", entities=["driver"], - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT32), - ], batch_source=source, ttl=timedelta(hours=2), ) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py new file mode 100644 index 00000000000..e7285308abb --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -0,0 +1,29 @@ +from tests.integration.feature_repos.test_repo_configuration import ( + TestRepoConfig, + construct_test_environment, +) + + +def test_registration_and_retrieval_from_custom_s3_endpoint(): + config = TestRepoConfig( + offline_store_creator="tests.integration.feature_repos.universal.data_sources.file.S3FileDataSourceCreator" + ) + import os + + if 'AWS_ACCESS_KEY_ID' in os.environ: + raise Exception("AWS_ACCESS_KEY_ID has already been set in the environment. Setting it again may cause a conflict. It may be better to deduplicate AWS configuration or use sub-processes for isolation") + + os.environ['AWS_ACCESS_KEY_ID'] = 'AKIAIOSFODNN7EXAMPLE' + os.environ['AWS_SECRET_ACCESS_KEY'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' + + with construct_test_environment( + config, create_and_apply=True, materialize=True + ) as environment: + fs = environment.feature_store + out = fs.get_online_features( + features=["driver_stats:conv_rate"], entity_rows=[{"driver": 5001}] + ).to_dict() + assert out["conv_rate"][0] is not None + + del os.environ['AWS_ACCESS_KEY_ID'] + del os.environ['AWS_SECRET_ACCESS_KEY'] \ No newline at end of file From 645ebaaf596bca740c0e4929213e0448451575af Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 18 Aug 2021 14:12:54 -0700 Subject: [PATCH 3/8] Reformat Signed-off-by: Willem Pienaar --- .../offline_store/test_s3_custom_endpoint.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index e7285308abb..491e54bde76 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -10,11 +10,13 @@ def test_registration_and_retrieval_from_custom_s3_endpoint(): ) import os - if 'AWS_ACCESS_KEY_ID' in os.environ: - raise Exception("AWS_ACCESS_KEY_ID has already been set in the environment. Setting it again may cause a conflict. It may be better to deduplicate AWS configuration or use sub-processes for isolation") + if "AWS_ACCESS_KEY_ID" in os.environ: + raise Exception( + "AWS_ACCESS_KEY_ID has already been set in the environment. Setting it again may cause a conflict. It may be better to deduplicate AWS configuration or use sub-processes for isolation" + ) - os.environ['AWS_ACCESS_KEY_ID'] = 'AKIAIOSFODNN7EXAMPLE' - os.environ['AWS_SECRET_ACCESS_KEY'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' + os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" + os.environ["AWS_SECRET_ACCESS_KEY"] = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" with construct_test_environment( config, create_and_apply=True, materialize=True @@ -25,5 +27,5 @@ def test_registration_and_retrieval_from_custom_s3_endpoint(): ).to_dict() assert out["conv_rate"][0] is not None - del os.environ['AWS_ACCESS_KEY_ID'] - del os.environ['AWS_SECRET_ACCESS_KEY'] \ No newline at end of file + del os.environ["AWS_ACCESS_KEY_ID"] + del os.environ["AWS_SECRET_ACCESS_KEY"] From d872a19e804255dbdad960973317c4d063116407 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 18 Aug 2021 14:20:59 -0700 Subject: [PATCH 4/8] Small refactoring Signed-off-by: Willem Pienaar --- .../feature_repos/universal/data_sources/file.py | 10 +++++----- .../offline_store/test_s3_custom_endpoint.py | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index ace3b7ccbd2..93048fa3a19 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -57,23 +57,23 @@ class S3FileDataSourceCreator(DataSourceCreator): bucket = "feast-test" access_key = "AKIAIOSFODNN7EXAMPLE" secret = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + minio_image = "minio/minio:RELEASE.2021-08-17T20-53-08Z" def __init__(self, _: str): self._setup_minio() def _setup_minio(self): - minio = DockerContainer("minio/minio:RELEASE.2021-08-17T20-53-08Z") - self.minio = minio - minio.with_exposed_ports(9000).with_exposed_ports(9001).with_env( + self.minio = DockerContainer(self.minio_image) + self.minio.with_exposed_ports(9000).with_exposed_ports(9001).with_env( "MINIO_ROOT_USER", self.access_key ).with_env("MINIO_ROOT_PASSWORD", self.secret).with_command( 'server /data --console-address ":9001"' ) - minio.start() + self.minio.start() log_string_to_wait_for = ( "API" # The minio container will print "API: ..." when ready. ) - wait_for_logs(container=minio, predicate=log_string_to_wait_for, timeout=5) + wait_for_logs(container=self.minio, predicate=log_string_to_wait_for, timeout=5) def _upload_parquet_file(self, df, file_name, minio_endpoint): self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 491e54bde76..daa618e302c 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -12,7 +12,8 @@ def test_registration_and_retrieval_from_custom_s3_endpoint(): if "AWS_ACCESS_KEY_ID" in os.environ: raise Exception( - "AWS_ACCESS_KEY_ID has already been set in the environment. Setting it again may cause a conflict. It may be better to deduplicate AWS configuration or use sub-processes for isolation" + "AWS_ACCESS_KEY_ID has already been set in the environment. Setting it again may cause a conflict. " + "It may be better to deduplicate AWS configuration or use sub-processes for isolation" ) os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" From a97802b5787b30dc61efc66b65f2fb6a22c0cbff Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 18 Aug 2021 14:22:15 -0700 Subject: [PATCH 5/8] Mark as integration Signed-off-by: Willem Pienaar --- .../tests/integration/offline_store/test_s3_custom_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index daa618e302c..4a11ccff109 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -3,7 +3,7 @@ construct_test_environment, ) - +@pytest.mark.integration def test_registration_and_retrieval_from_custom_s3_endpoint(): config = TestRepoConfig( offline_store_creator="tests.integration.feature_repos.universal.data_sources.file.S3FileDataSourceCreator" From 50468db362cb45bfa76cfc03a4f527fbf0d70b2a Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 18 Aug 2021 14:22:56 -0700 Subject: [PATCH 6/8] Add pytest import Signed-off-by: Willem Pienaar --- .../tests/integration/offline_store/test_s3_custom_endpoint.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 4a11ccff109..a3d097412bd 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -1,8 +1,11 @@ +import pytest + from tests.integration.feature_repos.test_repo_configuration import ( TestRepoConfig, construct_test_environment, ) + @pytest.mark.integration def test_registration_and_retrieval_from_custom_s3_endpoint(): config = TestRepoConfig( From eda2764c520e14e70c699ab8bba463adc72ed236 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 22 Aug 2021 15:07:41 -0700 Subject: [PATCH 7/8] Add comment on requiring docker Signed-off-by: Willem Pienaar --- CONTRIBUTING.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 95757ccad74..cf311f9bb11 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,8 +21,9 @@ pre-commit install --hook-type pre-push ## Feast Python SDK / CLI ### Environment Setup Setting up your development environment for Feast Python SDK / CLI: -1. Ensure that you have `make`, Python (3.7 and above) with `pip`, installed. -2. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed +1. Ensure that you have Docker installed in your environment. Docker is used to provision service dependencies during testing. +2. Ensure that you have `make`, Python (3.7 and above) with `pip`, installed. +3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed ```sh # create & activate a virtual environment python -v venv venv/ From 5986998997a74ce1f13b7b57b73f28d803086356 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 22 Aug 2021 15:32:03 -0700 Subject: [PATCH 8/8] Add type annotations Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/offline_stores/file.py | 4 ++-- .../feast/infra/offline_stores/file_source.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a04357bf941..5c6f96df576 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -112,7 +112,7 @@ def evaluate_historical_retrieval(): ) # Read offline parquet data in pyarrow format. - filesystem, path = FileSource.prepare_path( + filesystem, path = FileSource.create_filesystem_and_path( feature_view.batch_source.path, feature_view.batch_source.file_options.s3_endpoint_override, ) @@ -242,7 +242,7 @@ def pull_latest_from_table_or_query( # Create lazy function that is only called from the RetrievalJob object def evaluate_offline_job(): - filesystem, path = FileSource.prepare_path( + filesystem, path = FileSource.create_filesystem_and_path( data_source.path, data_source.file_options.s3_endpoint_override ) source_df = pd.read_parquet(path, filesystem=filesystem) diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 91aac446f1b..31eb5f037f0 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -1,6 +1,7 @@ from typing import Callable, Dict, Iterable, Optional, Tuple -from pyarrow import fs +from pyarrow._fs import FileSystem +from pyarrow._s3fs import S3FileSystem from pyarrow.parquet import ParquetFile from feast import type_map @@ -138,7 +139,7 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - filesystem, path = FileSource.prepare_path( + filesystem, path = FileSource.create_filesystem_and_path( self.path, self._file_options.s3_endpoint_override ) schema = ParquetFile( @@ -147,12 +148,14 @@ def get_table_column_names_and_types( return zip(schema.names, map(str, schema.types)) @staticmethod - def prepare_path(path: str, s3_endpoint_override: str): + def create_filesystem_and_path( + path: str, s3_endpoint_override: str + ) -> Tuple[Optional[FileSystem], str]: if path.startswith("s3://"): - s3 = fs.S3FileSystem( + s3fs = S3FileSystem( endpoint_override=s3_endpoint_override if s3_endpoint_override else None ) - return s3, path.replace("s3://", "") + return s3fs, path.replace("s3://", "") else: return None, path