|
1 |
| -import socket |
2 | 1 | import threading
|
3 | 2 | import time
|
4 |
| -from contextlib import closing |
5 | 3 | from datetime import datetime
|
6 | 4 | from typing import List
|
7 | 5 |
|
|
11 | 9 | import pytz
|
12 | 10 | import requests
|
13 | 11 |
|
14 |
| -from feast import FeatureService, FeatureView, ValueType |
15 | 12 | from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer
|
16 | 13 | from feast.feast_object import FeastObject
|
17 | 14 | from feast.feature_logging import LoggingConfig
|
| 15 | +from feast.feature_service import FeatureService |
18 | 16 | from feast.infra.feature_servers.base_config import FeatureLoggingConfig
|
19 | 17 | from feast.protos.feast.serving.ServingService_pb2 import (
|
20 | 18 | FieldStatus,
|
|
24 | 22 | from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
|
25 | 23 | from feast.protos.feast.types.Value_pb2 import RepeatedValue
|
26 | 24 | from feast.type_map import python_values_to_proto_values
|
| 25 | +from feast.value_type import ValueType |
27 | 26 | from feast.wait import wait_retry_backoff
|
28 | 27 | from tests.integration.feature_repos.repo_configuration import (
|
29 | 28 | construct_universal_feature_views,
|
|
33 | 32 | driver,
|
34 | 33 | location,
|
35 | 34 | )
|
36 |
| - |
37 |
| - |
38 |
| -@pytest.fixture |
39 |
| -def initialized_registry(environment, universal_data_sources): |
40 |
| - fs = environment.feature_store |
41 |
| - |
42 |
| - _, _, data_sources = universal_data_sources |
43 |
| - feature_views = construct_universal_feature_views(data_sources) |
44 |
| - |
45 |
| - feature_service = FeatureService( |
46 |
| - name="driver_features", |
47 |
| - features=[feature_views.driver], |
48 |
| - logging_config=LoggingConfig( |
49 |
| - destination=environment.data_source_creator.create_logged_features_destination(), |
50 |
| - sample_rate=1.0, |
51 |
| - ), |
52 |
| - ) |
53 |
| - feast_objects: List[FeastObject] = [feature_service] |
54 |
| - feast_objects.extend(feature_views.values()) |
55 |
| - feast_objects.extend([driver(), customer(), location()]) |
56 |
| - |
57 |
| - fs.apply(feast_objects) |
58 |
| - fs.materialize(environment.start_date, environment.end_date) |
59 |
| - |
60 |
| - |
61 |
| -def server_port(environment, server_type: str): |
62 |
| - if not environment.test_repo_config.go_feature_serving: |
63 |
| - pytest.skip("Only for Go path") |
64 |
| - |
65 |
| - fs = environment.feature_store |
66 |
| - |
67 |
| - embedded = EmbeddedOnlineFeatureServer( |
68 |
| - repo_path=str(fs.repo_path.absolute()), |
69 |
| - repo_config=fs.config, |
70 |
| - feature_store=fs, |
71 |
| - ) |
72 |
| - port = free_port() |
73 |
| - if server_type == "grpc": |
74 |
| - target = embedded.start_grpc_server |
75 |
| - elif server_type == "http": |
76 |
| - target = embedded.start_http_server |
77 |
| - else: |
78 |
| - raise ValueError("Server Type must be either 'http' or 'grpc'") |
79 |
| - |
80 |
| - t = threading.Thread( |
81 |
| - target=target, |
82 |
| - args=("127.0.0.1", port), |
83 |
| - kwargs=dict( |
84 |
| - enable_logging=True, |
85 |
| - logging_options=FeatureLoggingConfig( |
86 |
| - enabled=True, |
87 |
| - queue_capacity=100, |
88 |
| - write_to_disk_interval_secs=1, |
89 |
| - flush_interval_secs=1, |
90 |
| - emit_timeout_micro_secs=10000, |
91 |
| - ), |
92 |
| - ), |
93 |
| - ) |
94 |
| - t.start() |
95 |
| - |
96 |
| - wait_retry_backoff( |
97 |
| - lambda: (None, check_port_open("127.0.0.1", port)), timeout_secs=15 |
98 |
| - ) |
99 |
| - |
100 |
| - yield port |
101 |
| - if server_type == "grpc": |
102 |
| - embedded.stop_grpc_server() |
103 |
| - else: |
104 |
| - embedded.stop_http_server() |
105 |
| - |
106 |
| - # wait for graceful stop |
107 |
| - time.sleep(5) |
108 |
| - |
109 |
| - |
110 |
| -@pytest.fixture |
111 |
| -def grpc_server_port(environment, initialized_registry): |
112 |
| - yield from server_port(environment, "grpc") |
113 |
| - |
114 |
| - |
115 |
| -@pytest.fixture |
116 |
| -def http_server_port(environment, initialized_registry): |
117 |
| - yield from server_port(environment, "http") |
118 |
| - |
119 |
| - |
120 |
| -@pytest.fixture |
121 |
| -def grpc_client(grpc_server_port): |
122 |
| - ch = grpc.insecure_channel(f"localhost:{grpc_server_port}") |
123 |
| - yield ServingServiceStub(ch) |
| 35 | +from tests.utils.http_server import check_port_open, free_port |
| 36 | +from tests.utils.test_log_creator import generate_expected_logs, get_latest_rows |
124 | 37 |
|
125 | 38 |
|
126 | 39 | @pytest.mark.integration
|
@@ -254,43 +167,97 @@ def retrieve():
|
254 | 167 | pd.testing.assert_frame_equal(expected_logs, persisted_logs, check_dtype=False)
|
255 | 168 |
|
256 | 169 |
|
257 |
| -def free_port(): |
258 |
| - sock = socket.socket() |
259 |
| - sock.bind(("", 0)) |
260 |
| - return sock.getsockname()[1] |
| 170 | +""" |
| 171 | +Start go feature server either on http or grpc based on the repo configuration for testing. |
| 172 | +""" |
261 | 173 |
|
262 | 174 |
|
263 |
| -def check_port_open(host, port) -> bool: |
264 |
| - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
265 |
| - return sock.connect_ex((host, port)) == 0 |
| 175 | +def _server_port(environment, server_type: str): |
| 176 | + if not environment.test_repo_config.go_feature_serving: |
| 177 | + pytest.skip("Only for Go path") |
266 | 178 |
|
| 179 | + fs = environment.feature_store |
267 | 180 |
|
268 |
| -def get_latest_rows(df, join_key, entity_values): |
269 |
| - rows = df[df[join_key].isin(entity_values)] |
270 |
| - return rows.loc[rows.groupby(join_key)["event_timestamp"].idxmax()] |
| 181 | + embedded = EmbeddedOnlineFeatureServer( |
| 182 | + repo_path=str(fs.repo_path.absolute()), |
| 183 | + repo_config=fs.config, |
| 184 | + feature_store=fs, |
| 185 | + ) |
| 186 | + port = free_port() |
| 187 | + if server_type == "grpc": |
| 188 | + target = embedded.start_grpc_server |
| 189 | + elif server_type == "http": |
| 190 | + target = embedded.start_http_server |
| 191 | + else: |
| 192 | + raise ValueError("Server Type must be either 'http' or 'grpc'") |
| 193 | + |
| 194 | + t = threading.Thread( |
| 195 | + target=target, |
| 196 | + args=("127.0.0.1", port), |
| 197 | + kwargs=dict( |
| 198 | + enable_logging=True, |
| 199 | + logging_options=FeatureLoggingConfig( |
| 200 | + enabled=True, |
| 201 | + queue_capacity=100, |
| 202 | + write_to_disk_interval_secs=1, |
| 203 | + flush_interval_secs=1, |
| 204 | + emit_timeout_micro_secs=10000, |
| 205 | + ), |
| 206 | + ), |
| 207 | + ) |
| 208 | + t.start() |
271 | 209 |
|
| 210 | + wait_retry_backoff( |
| 211 | + lambda: (None, check_port_open("127.0.0.1", port)), timeout_secs=15 |
| 212 | + ) |
272 | 213 |
|
273 |
| -def generate_expected_logs( |
274 |
| - df: pd.DataFrame, |
275 |
| - feature_view: FeatureView, |
276 |
| - features: List[str], |
277 |
| - join_keys: List[str], |
278 |
| - timestamp_column: str, |
279 |
| -): |
280 |
| - logs = pd.DataFrame() |
281 |
| - for join_key in join_keys: |
282 |
| - logs[join_key] = df[join_key] |
283 |
| - |
284 |
| - for feature in features: |
285 |
| - col = f"{feature_view.name}__{feature}" |
286 |
| - logs[col] = df[feature] |
287 |
| - logs[f"{col}__timestamp"] = df[timestamp_column] |
288 |
| - logs[f"{col}__status"] = FieldStatus.PRESENT |
289 |
| - if feature_view.ttl: |
290 |
| - logs[f"{col}__status"] = logs[f"{col}__status"].mask( |
291 |
| - df[timestamp_column] |
292 |
| - < datetime.utcnow().replace(tzinfo=pytz.UTC) - feature_view.ttl, |
293 |
| - FieldStatus.OUTSIDE_MAX_AGE, |
294 |
| - ) |
| 214 | + yield port |
| 215 | + if server_type == "grpc": |
| 216 | + embedded.stop_grpc_server() |
| 217 | + else: |
| 218 | + embedded.stop_http_server() |
295 | 219 |
|
296 |
| - return logs.sort_values(by=join_keys).reset_index(drop=True) |
| 220 | + # wait for graceful stop |
| 221 | + time.sleep(5) |
| 222 | + |
| 223 | + |
| 224 | +# Go test fixtures |
| 225 | + |
| 226 | + |
| 227 | +@pytest.fixture |
| 228 | +def initialized_registry(environment, universal_data_sources): |
| 229 | + fs = environment.feature_store |
| 230 | + |
| 231 | + _, _, data_sources = universal_data_sources |
| 232 | + feature_views = construct_universal_feature_views(data_sources) |
| 233 | + |
| 234 | + feature_service = FeatureService( |
| 235 | + name="driver_features", |
| 236 | + features=[feature_views.driver], |
| 237 | + logging_config=LoggingConfig( |
| 238 | + destination=environment.data_source_creator.create_logged_features_destination(), |
| 239 | + sample_rate=1.0, |
| 240 | + ), |
| 241 | + ) |
| 242 | + feast_objects: List[FeastObject] = [feature_service] |
| 243 | + feast_objects.extend(feature_views.values()) |
| 244 | + feast_objects.extend([driver(), customer(), location()]) |
| 245 | + |
| 246 | + fs.apply(feast_objects) |
| 247 | + fs.materialize(environment.start_date, environment.end_date) |
| 248 | + |
| 249 | + |
| 250 | +@pytest.fixture |
| 251 | +def grpc_server_port(environment, initialized_registry): |
| 252 | + yield from _server_port(environment, "grpc") |
| 253 | + |
| 254 | + |
| 255 | +@pytest.fixture |
| 256 | +def http_server_port(environment, initialized_registry): |
| 257 | + yield from _server_port(environment, "http") |
| 258 | + |
| 259 | + |
| 260 | +@pytest.fixture |
| 261 | +def grpc_client(grpc_server_port): |
| 262 | + ch = grpc.insecure_channel(f"localhost:{grpc_server_port}") |
| 263 | + yield ServingServiceStub(ch) |
0 commit comments