|
4 | 4 | import traceback
|
5 | 5 | from datetime import datetime
|
6 | 6 | from typing import Any, Dict, List, cast
|
7 |
| -from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto |
8 | 7 |
|
9 | 8 | import pyarrow as pa
|
10 | 9 | import pyarrow.flight as fl
|
11 |
| -from google.protobuf.json_format import MessageToDict, Parse |
| 10 | +from google.protobuf.json_format import Parse |
12 | 11 |
|
13 | 12 | from feast import FeatureStore, FeatureView, utils
|
14 | 13 | from feast.arrow_error_handler import arrow_server_error_handling_decorator
|
15 | 14 | from feast.data_source import DataSource
|
16 |
| -from feast.errors import FeastObjectNotFoundException |
17 | 15 | from feast.feature_logging import FeatureServiceLoggingSource
|
18 | 16 | from feast.feature_view import DUMMY_ENTITY_NAME
|
19 | 17 | from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
|
|
30 | 28 | init_security_manager,
|
31 | 29 | str_to_auth_manager_type,
|
32 | 30 | )
|
| 31 | +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto |
33 | 32 | from feast.saved_dataset import SavedDatasetStorage
|
34 | 33 |
|
35 | 34 | logger = logging.getLogger(__name__)
|
@@ -469,27 +468,30 @@ def persist(self, command: dict, key: str):
|
469 | 468 | traceback.print_exc()
|
470 | 469 | raise e
|
471 | 470 |
|
472 |
| - def validate_data_source(self, command: dict): |
473 |
| - data_source_name = command["data_source_name"] |
474 |
| - logger.debug(f"Validating data source {data_source_name}") |
475 |
| - try: |
476 |
| - data_source = self.store.registry.get_data_source( |
477 |
| - name=data_source_name, project=self.store.config.project |
478 |
| - ) |
479 |
| - self.offline_store.validate_data_source( |
480 |
| - config=self.store.config, |
481 |
| - data_source=data_source, |
482 |
| - ) |
483 |
| - except FeastObjectNotFoundException: |
484 |
| - logger.debug(f"DataSource {data_source_name} not found, validation skipped") |
485 |
| - |
486 |
| - def get_table_column_names_and_types_from_data_source(self, command: dict): |
| 471 | + @staticmethod |
| 472 | + def _extract_data_source_from_command(command) -> DataSource: |
487 | 473 | data_source_proto_str = command["data_source_proto"]
|
488 |
| - logger.debug(f"Fetching table columns metadata from {data_source_proto_str}") |
| 474 | + logger.debug(f"Extracted data_source_proto {data_source_proto_str}") |
489 | 475 | data_source_proto = DataSourceProto()
|
490 | 476 | Parse(data_source_proto_str, data_source_proto)
|
491 | 477 | data_source = DataSource.from_proto(data_source_proto)
|
492 | 478 | logger.debug(f"Converted to DataSource {data_source}")
|
| 479 | + return data_source |
| 480 | + |
| 481 | + def validate_data_source(self, command: dict): |
| 482 | + data_source = OfflineServer._extract_data_source_from_command(command) |
| 483 | + logger.debug(f"Validating data source {data_source.name}") |
| 484 | + assert_permissions(data_source, actions=[AuthzedAction.READ_OFFLINE]) |
| 485 | + |
| 486 | + self.offline_store.validate_data_source( |
| 487 | + config=self.store.config, |
| 488 | + data_source=data_source, |
| 489 | + ) |
| 490 | + |
| 491 | + def get_table_column_names_and_types_from_data_source(self, command: dict): |
| 492 | + data_source = OfflineServer._extract_data_source_from_command(command) |
| 493 | + logger.debug(f"Fetching table columns metadata from {data_source.name}") |
| 494 | + assert_permissions(data_source, actions=[AuthzedAction.READ_OFFLINE]) |
493 | 495 |
|
494 | 496 | column_names_and_types = data_source.get_table_column_names_and_types(
|
495 | 497 | self.store.config
|
|
0 commit comments