|
| 1 | +import logging |
1 | 2 | import os.path
|
2 | 3 | import shutil
|
| 4 | +import subprocess |
3 | 5 | import tempfile
|
4 | 6 | import uuid
|
| 7 | +from pathlib import Path |
5 | 8 | from typing import Any, Dict, List, Optional
|
6 | 9 |
|
7 | 10 | import pandas as pd
|
8 | 11 | import pyarrow as pa
|
9 | 12 | import pyarrow.parquet as pq
|
| 13 | +import yaml |
10 | 14 | from minio import Minio
|
11 |
| -from multiprocess import Process |
12 | 15 | from testcontainers.core.generic import DockerContainer
|
13 | 16 | from testcontainers.core.waiting_utils import wait_for_logs
|
14 | 17 | from testcontainers.minio import MinioContainer
|
15 | 18 |
|
16 |
| -from feast import FeatureStore, FileSource, RepoConfig |
| 19 | +from feast import FileSource, RepoConfig |
17 | 20 | from feast.data_format import DeltaFormat, ParquetFormat
|
18 | 21 | from feast.data_source import DataSource
|
19 | 22 | from feast.feature_logging import LoggingDestination
|
|
24 | 27 | SavedDatasetFileStorage,
|
25 | 28 | )
|
26 | 29 | from feast.infra.offline_stores.remote import RemoteOfflineStoreConfig
|
27 |
| -from feast.offline_server import start_server |
28 | 30 | from feast.repo_config import FeastConfigBaseModel, RegistryConfig
|
29 | 31 | from feast.wait import wait_retry_backoff # noqa: E402
|
30 | 32 | from tests.integration.feature_repos.universal.data_source_creator import (
|
31 | 33 | DataSourceCreator,
|
32 | 34 | )
|
33 | 35 | from tests.utils.http_server import check_port_open, free_port # noqa: E402
|
34 | 36 |
|
| 37 | +logger = logging.getLogger(__name__) |
| 38 | + |
35 | 39 |
|
36 | 40 | class FileDataSourceCreator(DataSourceCreator):
|
37 | 41 | files: List[Any]
|
@@ -363,31 +367,44 @@ class RemoteOfflineStoreDataSourceCreator(FileDataSourceCreator):
|
363 | 367 | def __init__(self, project_name: str, *args, **kwargs):
|
364 | 368 | super().__init__(project_name)
|
365 | 369 | self.server_port: int = 0
|
366 |
| - self.proc: Process = None |
| 370 | + self.proc = None |
367 | 371 |
|
368 | 372 | def setup(self, registry: RegistryConfig):
|
369 | 373 | parent_offline_config = super().create_offline_store_config()
|
370 |
| - |
371 |
| - fs = FeatureStore( |
372 |
| - config=RepoConfig( |
373 |
| - project=self.project_name, |
374 |
| - provider="local", |
375 |
| - offline_store=parent_offline_config, |
376 |
| - registry=registry.path, |
377 |
| - entity_key_serialization_version=2, |
378 |
| - ) |
| 374 | + config = RepoConfig( |
| 375 | + project=self.project_name, |
| 376 | + provider="local", |
| 377 | + offline_store=parent_offline_config, |
| 378 | + registry=registry.path, |
| 379 | + entity_key_serialization_version=2, |
379 | 380 | )
|
| 381 | + |
| 382 | + repo_path = Path(tempfile.mkdtemp()) |
| 383 | + with open(repo_path / "feature_store.yaml", "w") as outfile: |
| 384 | + yaml.dump(config.dict(by_alias=True), outfile) |
| 385 | + repo_path = str(repo_path.resolve()) |
| 386 | + |
380 | 387 | self.server_port = free_port()
|
381 | 388 | host = "0.0.0.0"
|
382 |
| - self.proc = Process( |
383 |
| - target=start_server, |
384 |
| - args=(fs, host, self.server_port), |
| 389 | + cmd = [ |
| 390 | + "feast", |
| 391 | + "-c" + repo_path, |
| 392 | + "serve_offline", |
| 393 | + "--host", |
| 394 | + host, |
| 395 | + "--port", |
| 396 | + str(self.server_port), |
| 397 | + ] |
| 398 | + self.proc = subprocess.Popen( |
| 399 | + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL |
385 | 400 | )
|
386 |
| - self.proc.start() |
| 401 | + |
| 402 | + _time_out_sec: int = 60 |
387 | 403 | # Wait for server to start
|
388 | 404 | wait_retry_backoff(
|
389 | 405 | lambda: (None, check_port_open(host, self.server_port)),
|
390 |
| - timeout_secs=10, |
| 406 | + timeout_secs=_time_out_sec, |
| 407 | + timeout_msg=f"Unable to start the feast remote offline server in {_time_out_sec} seconds at port={self.server_port}", |
391 | 408 | )
|
392 | 409 | return "grpc+tcp://{}:{}".format(host, self.server_port)
|
393 | 410 |
|
|
0 commit comments