Skip to content

Commit ea8ad17

Browse files
feat: Support s3gov schema by snowflake offline store during materialization (#3891)
1 parent 8d6bec8 commit ea8ad17

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

sdk/python/feast/infra/offline_stores/snowflake.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -615,12 +615,17 @@ def to_remote_storage(self) -> List[str]:
615615
HEADER = TRUE
616616
"""
617617
cursor = execute_snowflake_statement(self.snowflake_conn, query)
618+
# s3gov schema is used by Snowflake in AWS govcloud regions
619+
# remove gov portion from schema and pass it to online store upload
620+
native_export_path = self.export_path.replace("s3gov://", "s3://")
621+
return self._get_file_names_from_copy_into(cursor, native_export_path)
618622

623+
def _get_file_names_from_copy_into(self, cursor, native_export_path) -> List[str]:
619624
file_name_column_index = [
620625
idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME"
621626
][0]
622627
return [
623-
f"{self.export_path}/{row[file_name_column_index]}"
628+
f"{native_export_path}/{row[file_name_column_index]}"
624629
for row in cursor.fetchall()
625630
]
626631

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import re
2+
from unittest.mock import ANY, MagicMock, patch
3+
4+
import pytest
5+
6+
from feast.infra.offline_stores.snowflake import (
7+
SnowflakeOfflineStoreConfig,
8+
SnowflakeRetrievalJob,
9+
)
10+
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
11+
from feast.repo_config import RepoConfig
12+
13+
14+
@pytest.fixture(params=["s3", "s3gov"])
15+
def retrieval_job(request):
16+
offline_store_config = SnowflakeOfflineStoreConfig(
17+
type="snowflake.offline",
18+
account="snow",
19+
user="snow",
20+
password="snow",
21+
role="snow",
22+
warehouse="snow",
23+
database="FEAST",
24+
schema="OFFLINE",
25+
storage_integration_name="FEAST_S3",
26+
blob_export_location=f"{request.param}://feast-snowflake-offload/export",
27+
)
28+
retrieval_job = SnowflakeRetrievalJob(
29+
query="SELECT * FROM snowflake",
30+
snowflake_conn=MagicMock(),
31+
config=RepoConfig(
32+
registry="s3://ml-test/repo/registry.db",
33+
project="test",
34+
provider="snowflake.offline",
35+
online_store=SqliteOnlineStoreConfig(type="sqlite"),
36+
offline_store=offline_store_config,
37+
),
38+
full_feature_names=True,
39+
on_demand_feature_views=[],
40+
)
41+
return retrieval_job
42+
43+
44+
def test_to_remote_storage(retrieval_job):
45+
stored_files = ["just a path", "maybe another"]
46+
with patch.object(
47+
retrieval_job, "to_snowflake", return_value=None
48+
) as mock_to_snowflake, patch.object(
49+
retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files
50+
) as mock_get_file_names_from_copy:
51+
assert (
52+
retrieval_job.to_remote_storage() == stored_files
53+
), "should return the list of files"
54+
mock_to_snowflake.assert_called_once()
55+
mock_get_file_names_from_copy.assert_called_once_with(ANY, ANY)
56+
native_path = mock_get_file_names_from_copy.call_args[0][1]
57+
assert re.match("^s3://.*", native_path), "path should be s3://*"

0 commit comments

Comments
 (0)