Skip to content

Commit bf740d2

Browse files
nj7felixwang9817
andauthored
fix: Added generic Feature store Creation for CLI (feast-dev#3618)
Added generic Feature store Creation Signed-off-by: Felix Wang <wangfelix98@gmail.com> Co-authored-by: Felix Wang <wangfelix98@gmail.com>
1 parent 059509a commit bf740d2

File tree

2 files changed

+44
-89
lines changed

2 files changed

+44
-89
lines changed

sdk/python/feast/cli.py

+20-89
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import base64
1514
import json
1615
import logging
17-
import os
18-
import tempfile
1916
from datetime import datetime
2017
from pathlib import Path
2118
from typing import List, Optional
@@ -28,18 +25,15 @@
2825
from pygments import formatters, highlight, lexers
2926

3027
from feast import utils
31-
from feast.constants import (
32-
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
33-
FEATURE_STORE_YAML_ENV_NAME,
34-
)
28+
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
3529
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
36-
from feast.feature_store import FeatureStore
3730
from feast.feature_view import FeatureView
3831
from feast.on_demand_feature_view import OnDemandFeatureView
3932
from feast.repo_config import load_repo_config
4033
from feast.repo_operations import (
4134
apply_total,
4235
cli_check_repo,
36+
create_feature_store,
4337
generate_project_name,
4438
init_repo,
4539
plan,
@@ -172,10 +166,7 @@ def ui(
172166
"""
173167
Shows the Feast UI over the current directory
174168
"""
175-
repo = ctx.obj["CHDIR"]
176-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
177-
cli_check_repo(repo, fs_yaml_file)
178-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
169+
store = create_feature_store(ctx)
179170
# Pass in the registry_dump method to get around a circular dependency
180171
store.serve_ui(
181172
host=host,
@@ -192,10 +183,7 @@ def endpoint(ctx: click.Context):
192183
"""
193184
Display feature server endpoints
194185
"""
195-
repo = ctx.obj["CHDIR"]
196-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
197-
cli_check_repo(repo, fs_yaml_file)
198-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
186+
store = create_feature_store(ctx)
199187
endpoint = store.get_feature_server_endpoint()
200188
if endpoint is not None:
201189
_logger.info(
@@ -220,10 +208,7 @@ def data_source_describe(ctx: click.Context, name: str):
220208
"""
221209
Describe a data source
222210
"""
223-
repo = ctx.obj["CHDIR"]
224-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
225-
cli_check_repo(repo, fs_yaml_file)
226-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
211+
store = create_feature_store(ctx)
227212

228213
try:
229214
data_source = store.get_data_source(name)
@@ -244,10 +229,7 @@ def data_source_list(ctx: click.Context):
244229
"""
245230
List all data sources
246231
"""
247-
repo = ctx.obj["CHDIR"]
248-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
249-
cli_check_repo(repo, fs_yaml_file)
250-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
232+
store = create_feature_store(ctx)
251233
table = []
252234
for datasource in store.list_data_sources():
253235
table.append([datasource.name, datasource.__class__])
@@ -272,10 +254,7 @@ def entity_describe(ctx: click.Context, name: str):
272254
"""
273255
Describe an entity
274256
"""
275-
repo = ctx.obj["CHDIR"]
276-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
277-
cli_check_repo(repo, fs_yaml_file)
278-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
257+
store = create_feature_store(ctx)
279258

280259
try:
281260
entity = store.get_entity(name)
@@ -296,10 +275,7 @@ def entity_list(ctx: click.Context):
296275
"""
297276
List all entities
298277
"""
299-
repo = ctx.obj["CHDIR"]
300-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
301-
cli_check_repo(repo, fs_yaml_file)
302-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
278+
store = create_feature_store(ctx)
303279
table = []
304280
for entity in store.list_entities():
305281
table.append([entity.name, entity.description, entity.value_type])
@@ -324,10 +300,7 @@ def feature_service_describe(ctx: click.Context, name: str):
324300
"""
325301
Describe a feature service
326302
"""
327-
repo = ctx.obj["CHDIR"]
328-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
329-
cli_check_repo(repo, fs_yaml_file)
330-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
303+
store = create_feature_store(ctx)
331304

332305
try:
333306
feature_service = store.get_feature_service(name)
@@ -350,10 +323,7 @@ def feature_service_list(ctx: click.Context):
350323
"""
351324
List all feature services
352325
"""
353-
repo = ctx.obj["CHDIR"]
354-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
355-
cli_check_repo(repo, fs_yaml_file)
356-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
326+
store = create_feature_store(ctx)
357327
feature_services = []
358328
for feature_service in store.list_feature_services():
359329
feature_names = []
@@ -383,10 +353,7 @@ def feature_view_describe(ctx: click.Context, name: str):
383353
"""
384354
Describe a feature view
385355
"""
386-
repo = ctx.obj["CHDIR"]
387-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
388-
cli_check_repo(repo, fs_yaml_file)
389-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
356+
store = create_feature_store(ctx)
390357

391358
try:
392359
feature_view = store.get_feature_view(name)
@@ -407,11 +374,7 @@ def feature_view_list(ctx: click.Context):
407374
"""
408375
List all feature views
409376
"""
410-
repo = ctx.obj["CHDIR"]
411-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
412-
413-
cli_check_repo(repo, fs_yaml_file)
414-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
377+
store = create_feature_store(ctx)
415378
table = []
416379
for feature_view in [
417380
*store.list_feature_views(),
@@ -452,10 +415,7 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
452415
"""
453416
[Experimental] Describe an on demand feature view
454417
"""
455-
repo = ctx.obj["CHDIR"]
456-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
457-
cli_check_repo(repo, fs_yaml_file)
458-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
418+
store = create_feature_store(ctx)
459419

460420
try:
461421
on_demand_feature_view = store.get_on_demand_feature_view(name)
@@ -478,10 +438,7 @@ def on_demand_feature_view_list(ctx: click.Context):
478438
"""
479439
[Experimental] List all on demand feature views
480440
"""
481-
repo = ctx.obj["CHDIR"]
482-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
483-
cli_check_repo(repo, fs_yaml_file)
484-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
441+
store = create_feature_store(ctx)
485442
table = []
486443
for on_demand_feature_view in store.list_on_demand_feature_views():
487444
table.append([on_demand_feature_view.name])
@@ -583,10 +540,8 @@ def materialize_command(
583540
584541
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
585542
"""
586-
repo = ctx.obj["CHDIR"]
587-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
588-
cli_check_repo(repo, fs_yaml_file)
589-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
543+
store = create_feature_store(ctx)
544+
590545
store.materialize(
591546
feature_views=None if not views else views,
592547
start_date=utils.make_tzaware(parser.parse(start_ts)),
@@ -612,10 +567,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
612567
613568
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
614569
"""
615-
repo = ctx.obj["CHDIR"]
616-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
617-
cli_check_repo(repo, fs_yaml_file)
618-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
570+
store = create_feature_store(ctx)
619571
store.materialize_incremental(
620572
feature_views=None if not views else views,
621573
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
@@ -707,22 +659,7 @@ def serve_command(
707659
no_feature_log: bool,
708660
):
709661
"""Start a feature server locally on a given port."""
710-
repo = ctx.obj["CHDIR"]
711-
712-
# If we received a base64 encoded version of feature_store.yaml, use that
713-
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
714-
if config_base64:
715-
print("Received base64 encoded feature_store.yaml")
716-
config_bytes = base64.b64decode(config_base64)
717-
# Create a new unique directory for writing feature_store.yaml
718-
repo_path = Path(tempfile.mkdtemp())
719-
with open(repo_path / "feature_store.yaml", "wb") as f:
720-
f.write(config_bytes)
721-
store = FeatureStore(repo_path=str(repo_path.resolve()))
722-
else:
723-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
724-
cli_check_repo(repo, fs_yaml_file)
725-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
662+
store = create_feature_store(ctx)
726663

727664
store.serve(host, port, type_, no_access_log, no_feature_log)
728665

@@ -738,10 +675,7 @@ def serve_command(
738675
@click.pass_context
739676
def serve_transformations_command(ctx: click.Context, port: int):
740677
"""[Experimental] Start a feature consumption server locally on a given port."""
741-
repo = ctx.obj["CHDIR"]
742-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
743-
cli_check_repo(repo, fs_yaml_file)
744-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
678+
store = create_feature_store(ctx)
745679

746680
store.serve_transformations(port)
747681

@@ -778,10 +712,7 @@ def validate(
778712
779713
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
780714
"""
781-
repo = ctx.obj["CHDIR"]
782-
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
783-
cli_check_repo(repo, fs_yaml_file)
784-
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
715+
store = create_feature_store(ctx)
785716

786717
feature_service = store.get_feature_service(name=feature_service)
787718
reference = store.get_validation_reference(reference)

sdk/python/feast/repo_operations.py

+24
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import base64
12
import importlib
23
import json
34
import os
45
import random
56
import re
67
import sys
8+
import tempfile
79
from importlib.abc import Loader
810
from importlib.machinery import ModuleSpec
911
from pathlib import Path
@@ -14,6 +16,7 @@
1416

1517
from feast import PushSource
1618
from feast.batch_feature_view import BatchFeatureView
19+
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
1720
from feast.data_source import DataSource, KafkaSource, KinesisSource
1821
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
1922
from feast.entity import Entity
@@ -328,6 +331,27 @@ def log_infra_changes(
328331
)
329332

330333

334+
@log_exceptions_and_usage
335+
def create_feature_store(
336+
ctx: click.Context,
337+
) -> FeatureStore:
338+
repo = ctx.obj["CHDIR"]
339+
# If we received a base64 encoded version of feature_store.yaml, use that
340+
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
341+
if config_base64:
342+
print("Received base64 encoded feature_store.yaml")
343+
config_bytes = base64.b64decode(config_base64)
344+
# Create a new unique directory for writing feature_store.yaml
345+
repo_path = Path(tempfile.mkdtemp())
346+
with open(repo_path / "feature_store.yaml", "wb") as f:
347+
f.write(config_bytes)
348+
return FeatureStore(repo_path=str(repo_path.resolve()))
349+
else:
350+
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
351+
cli_check_repo(repo, fs_yaml_file)
352+
return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
353+
354+
331355
@log_exceptions_and_usage
332356
def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool):
333357
os.chdir(repo_path)

0 commit comments

Comments
 (0)