Skip to content

Commit 7cc955f

Browse files
fix: Batch Snowflake materialization queries to obey Snowpark 100 feature limitation
Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>
1 parent f2cbf43 commit 7cc955f

File tree

1 file changed

+61
-32
lines changed

1 file changed

+61
-32
lines changed

sdk/python/feast/infra/materialization/snowflake_engine.py

+61-32
Original file line numberDiff line numberDiff line change
@@ -271,32 +271,65 @@ def _materialize_one(
271271

272272
fv_latest_values_sql = offline_job.to_sql()
273273

274+
# Determine payload size if writing to an external online store
275+
if self.repo_config.online_store.type != "snowflake.online":
276+
if feature_view.entity_columns:
277+
join_keys = [entity.name for entity in feature_view.entity_columns]
278+
unique_entities = '"' + '", "'.join(join_keys) + '"'
279+
280+
query = f"""
281+
SELECT
282+
COUNT(DISTINCT {unique_entities})
283+
FROM
284+
{feature_view.batch_source.get_table_query_string()}
285+
"""
286+
287+
with get_snowflake_conn(self.repo_config.offline_store) as conn:
288+
rows_to_write = conn.cursor().execute(query).fetchall()[0][0]
289+
else:
290+
rows_to_write = 1 # entityless feature view has a placeholder entity
291+
274292
if feature_view.batch_source.field_mapping is not None:
275293
fv_latest_mapped_values_sql = _run_snowflake_field_mapping(
276294
fv_latest_values_sql, feature_view.batch_source.field_mapping
277295
)
278296

279-
fv_to_proto_sql = self.generate_snowflake_materialization_query(
280-
self.repo_config,
281-
fv_latest_mapped_values_sql,
282-
feature_view,
283-
project,
297+
features_full_list = feature_view.features
298+
feature_batches = [
299+
features_full_list[i : i + 100]
300+
for i in range(0, len(features_full_list), 100)
301+
]
302+
303+
click.echo(
304+
f"""
305+
Materializing {Style.BRIGHT + Fore.GREEN}{len(features_full_list)}{Style.RESET_ALL} feature(s) in {Style.BRIGHT + Fore.GREEN}{len(feature_batches)}{Style.RESET_ALL} compute batch(es). There are {Style.BRIGHT + Fore.GREEN}{rows_to_write}{Style.RESET_ALL} unique entity(ies) per feature.
306+
{Style.BRIGHT + Fore.GREEN}{len(feature_batches)}{Style.RESET_ALL} batch(es) * {Style.BRIGHT + Fore.GREEN}{rows_to_write}{Style.RESET_ALL} unique entity(ies) = {Style.BRIGHT + Fore.GREEN}{rows_to_write*len(feature_batches)}{Style.RESET_ALL} records written. ({Style.BRIGHT + Fore.GREEN}{rows_to_write*len(features_full_list)}{Style.RESET_ALL} total rows in table)
307+
"""
284308
)
309+
with tqdm_builder(rows_to_write * len(feature_batches)) as pbar:
310+
for i, feature_batch in enumerate(feature_batches):
311+
fv_to_proto_sql = self.generate_snowflake_materialization_query(
312+
self.repo_config,
313+
fv_latest_mapped_values_sql,
314+
feature_view,
315+
feature_batch,
316+
project,
317+
)
285318

286-
if self.repo_config.online_store.type == "snowflake.online":
287-
self.materialize_to_snowflake_online_store(
288-
self.repo_config,
289-
fv_to_proto_sql,
290-
feature_view,
291-
project,
292-
)
293-
else:
294-
self.materialize_to_external_online_store(
295-
self.repo_config,
296-
fv_to_proto_sql,
297-
feature_view,
298-
tqdm_builder,
299-
)
319+
if self.repo_config.online_store.type == "snowflake.online":
320+
self.materialize_to_snowflake_online_store(
321+
self.repo_config,
322+
fv_to_proto_sql,
323+
feature_view,
324+
project,
325+
)
326+
else:
327+
self.materialize_to_external_online_store(
328+
self.repo_config,
329+
fv_to_proto_sql,
330+
feature_view,
331+
pbar,
332+
)
300333

301334
return SnowflakeMaterializationJob(
302335
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
@@ -311,6 +344,7 @@ def generate_snowflake_materialization_query(
311344
repo_config: RepoConfig,
312345
fv_latest_mapped_values_sql: str,
313346
feature_view: Union[BatchFeatureView, FeatureView],
347+
feature_batch: list,
314348
project: str,
315349
) -> str:
316350

@@ -333,7 +367,7 @@ def generate_snowflake_materialization_query(
333367
UDF serialization function.
334368
"""
335369
feature_sql_list = []
336-
for feature in feature_view.features:
370+
for feature in feature_batch:
337371
feature_value_type_name = feature.dtype.to_value_type().name
338372

339373
feature_sql = _convert_value_name_to_snowflake_udf(
@@ -441,7 +475,7 @@ def materialize_to_external_online_store(
441475
repo_config: RepoConfig,
442476
materialization_sql: str,
443477
feature_view: Union[StreamFeatureView, FeatureView],
444-
tqdm_builder: Callable[[int], tqdm],
478+
pbar: tqdm,
445479
) -> None:
446480

447481
feature_names = [feature.name for feature in feature_view.features]
@@ -450,10 +484,6 @@ def materialize_to_external_online_store(
450484
query = materialization_sql
451485
cursor = execute_snowflake_statement(conn, query)
452486
for i, df in enumerate(cursor.fetch_pandas_batches()):
453-
click.echo(
454-
f"Snowflake: Processing Materialization ResultSet Batch #{i+1}"
455-
)
456-
457487
entity_keys = (
458488
df["entity_key"].apply(EntityKeyProto.FromString).to_numpy()
459489
)
@@ -489,11 +519,10 @@ def materialize_to_external_online_store(
489519
)
490520
)
491521

492-
with tqdm_builder(len(rows_to_write)) as pbar:
493-
self.online_store.online_write_batch(
494-
repo_config,
495-
feature_view,
496-
rows_to_write,
497-
lambda x: pbar.update(x),
498-
)
522+
self.online_store.online_write_batch(
523+
repo_config,
524+
feature_view,
525+
rows_to_write,
526+
lambda x: pbar.update(x),
527+
)
499528
return None

0 commit comments

Comments
 (0)