Skip to content

Commit 44526f5

Browse files
fix: Batch Snowflake materialization queries to obey Snowpark 100 limit
Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>
1 parent 2c04ec1 commit 44526f5

File tree

1 file changed

+61
-35
lines changed

1 file changed

+61
-35
lines changed

sdk/python/feast/infra/materialization/snowflake_engine.py

+61-35
Original file line numberDiff line numberDiff line change
@@ -276,32 +276,65 @@ def _materialize_one(
276276

277277
fv_latest_values_sql = offline_job.to_sql()
278278

279+
if feature_view.entity_columns:
280+
join_keys = [entity.name for entity in feature_view.entity_columns]
281+
unique_entities = '"' + '", "'.join(join_keys) + '"'
282+
283+
query = f"""
284+
SELECT
285+
COUNT(DISTINCT {unique_entities})
286+
FROM
287+
{feature_view.batch_source.get_table_query_string()}
288+
"""
289+
290+
with GetSnowflakeConnection(self.repo_config.offline_store) as conn:
291+
entities_to_write = conn.cursor().execute(query).fetchall()[0][0]
292+
else:
293+
entities_to_write = (
294+
1 # entityless feature view has a placeholder entity
295+
)
296+
279297
if feature_view.batch_source.field_mapping is not None:
280298
fv_latest_mapped_values_sql = _run_snowflake_field_mapping(
281299
fv_latest_values_sql, feature_view.batch_source.field_mapping
282300
)
283301

284-
fv_to_proto_sql = self.generate_snowflake_materialization_query(
285-
self.repo_config,
286-
fv_latest_mapped_values_sql,
287-
feature_view,
288-
project,
289-
)
302+
features_full_list = feature_view.features
303+
feature_batches = [
304+
features_full_list[i : i + 100]
305+
for i in range(0, len(features_full_list), 100)
306+
]
290307

291308
if self.repo_config.online_store.type == "snowflake.online":
292-
self.materialize_to_snowflake_online_store(
293-
self.repo_config,
294-
fv_to_proto_sql,
295-
feature_view,
296-
project,
297-
)
309+
rows_to_write = entities_to_write * len(features_full_list)
298310
else:
299-
self.materialize_to_external_online_store(
300-
self.repo_config,
301-
fv_to_proto_sql,
302-
feature_view,
303-
tqdm_builder,
304-
)
311+
rows_to_write = entities_to_write * len(feature_batches)
312+
313+
with tqdm_builder(rows_to_write) as pbar:
314+
for i, feature_batch in enumerate(feature_batches):
315+
fv_to_proto_sql = self.generate_snowflake_materialization_query(
316+
self.repo_config,
317+
fv_latest_mapped_values_sql,
318+
feature_view,
319+
feature_batch,
320+
project,
321+
)
322+
323+
if self.repo_config.online_store.type == "snowflake.online":
324+
self.materialize_to_snowflake_online_store(
325+
self.repo_config,
326+
fv_to_proto_sql,
327+
feature_view,
328+
project,
329+
)
330+
pbar.update(entities_to_write * len(feature_batch))
331+
else:
332+
self.materialize_to_external_online_store(
333+
self.repo_config,
334+
fv_to_proto_sql,
335+
feature_view,
336+
pbar,
337+
)
305338

306339
return SnowflakeMaterializationJob(
307340
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
@@ -316,6 +349,7 @@ def generate_snowflake_materialization_query(
316349
repo_config: RepoConfig,
317350
fv_latest_mapped_values_sql: str,
318351
feature_view: Union[BatchFeatureView, FeatureView],
352+
feature_batch: list,
319353
project: str,
320354
) -> str:
321355

@@ -338,7 +372,7 @@ def generate_snowflake_materialization_query(
338372
UDF serialization function.
339373
"""
340374
feature_sql_list = []
341-
for feature in feature_view.features:
375+
for feature in feature_batch:
342376
feature_value_type_name = feature.dtype.to_value_type().name
343377

344378
feature_sql = _convert_value_name_to_snowflake_udf(
@@ -434,19 +468,16 @@ def materialize_to_snowflake_online_store(
434468
"""
435469

436470
with GetSnowflakeConnection(repo_config.batch_engine) as conn:
437-
query_id = execute_snowflake_statement(conn, query).sfqid
471+
execute_snowflake_statement(conn, query).sfqid
438472

439-
click.echo(
440-
f"Snowflake Query ID: {Style.BRIGHT + Fore.GREEN}{query_id}{Style.RESET_ALL}"
441-
)
442473
return None
443474

444475
def materialize_to_external_online_store(
445476
self,
446477
repo_config: RepoConfig,
447478
materialization_sql: str,
448479
feature_view: Union[StreamFeatureView, FeatureView],
449-
tqdm_builder: Callable[[int], tqdm],
480+
pbar: tqdm,
450481
) -> None:
451482

452483
feature_names = [feature.name for feature in feature_view.features]
@@ -455,10 +486,6 @@ def materialize_to_external_online_store(
455486
query = materialization_sql
456487
cursor = execute_snowflake_statement(conn, query)
457488
for i, df in enumerate(cursor.fetch_pandas_batches()):
458-
click.echo(
459-
f"Snowflake: Processing Materialization ResultSet Batch #{i+1}"
460-
)
461-
462489
entity_keys = (
463490
df["entity_key"].apply(EntityKeyProto.FromString).to_numpy()
464491
)
@@ -494,11 +521,10 @@ def materialize_to_external_online_store(
494521
)
495522
)
496523

497-
with tqdm_builder(len(rows_to_write)) as pbar:
498-
self.online_store.online_write_batch(
499-
repo_config,
500-
feature_view,
501-
rows_to_write,
502-
lambda x: pbar.update(x),
503-
)
524+
self.online_store.online_write_batch(
525+
repo_config,
526+
feature_view,
527+
rows_to_write,
528+
lambda x: pbar.update(x),
529+
)
504530
return None

0 commit comments

Comments
 (0)