Skip to content

Commit cff0133

Browse files
authored
chore: Basic e2e test for go grpc server (#2628)
* basic e2e tests for go grpc server Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * buffered channel Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * up go.mod Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * update gopy Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 2b2a76b commit cff0133

File tree

6 files changed

+163
-14
lines changed

6 files changed

+163
-14
lines changed

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/golang/protobuf v1.5.2
1111
github.com/google/uuid v1.3.0
1212
github.com/mattn/go-sqlite3 v1.14.12
13+
github.com/pkg/errors v0.9.1
1314
github.com/spaolacci/murmur3 v1.1.0
1415
github.com/stretchr/testify v1.7.0
1516
google.golang.org/grpc v1.45.0
@@ -34,7 +35,6 @@ require (
3435
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
3536
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
3637
github.com/pierrec/lz4/v4 v4.1.14 // indirect
37-
github.com/pkg/errors v0.9.1 // indirect
3838
github.com/pmezard/go-difflib v1.0.0 // indirect
3939
github.com/zeebo/xxh3 v1.0.2 // indirect
4040
golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 // indirect
@@ -50,4 +50,4 @@ require (
5050
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
5151
)
5252

53-
replace github.com/go-python/gopy v0.4.0 => github.com/feast-dev/gopy v0.4.1-0.20220329011409-d705e6cd1d9b
53+
replace github.com/go-python/gopy v0.4.0 => github.com/feast-dev/gopy v0.4.1-0.20220429180328-4257ac71a4d0

go.sum

+2-4
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
8585
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
8686
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
8787
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
88-
github.com/feast-dev/gopy v0.4.1-0.20220329011409-d705e6cd1d9b h1:C/oK6gi12Q7fiiVCI3e62tqWCSXqsTz9OpyK249XI84=
89-
github.com/feast-dev/gopy v0.4.1-0.20220329011409-d705e6cd1d9b/go.mod h1:ZO6vpitQ61NVoQP/2yOubPS6ET5pP3CAWCiMYn5eqCc=
88+
github.com/feast-dev/gopy v0.4.1-0.20220429180328-4257ac71a4d0 h1:Go714ObVP1O+a6qK7haXVL28QNm6WMD8bwnN9EA8PlM=
89+
github.com/feast-dev/gopy v0.4.1-0.20220429180328-4257ac71a4d0/go.mod h1:ZO6vpitQ61NVoQP/2yOubPS6ET5pP3CAWCiMYn5eqCc=
9090
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
9191
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
9292
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
@@ -168,8 +168,6 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
168168
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
169169
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
170170
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
171-
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
172-
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
173171
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
174172
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
175173
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=

go/embedded/online_features.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ import (
3030
)
3131

3232
type OnlineFeatureService struct {
33-
fs *feast.FeatureStore
33+
fs *feast.FeatureStore
34+
grpcStopCh chan os.Signal
3435
}
3536

3637
type OnlineFeatureServiceConfig struct {
@@ -54,7 +55,11 @@ func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCal
5455
log.Fatalln(err)
5556
}
5657

57-
return &OnlineFeatureService{fs: fs}
58+
// Notify this channel when receiving interrupt or termination signals from OS
59+
c := make(chan os.Signal, 1)
60+
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
61+
62+
return &OnlineFeatureService{fs: fs, grpcStopCh: c}
5863
}
5964

6065
func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[string]int32, error) {
@@ -219,15 +224,14 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
219224
if err != nil {
220225
return err
221226
}
227+
log.Printf("Listening a gRPC server on host %s port %d\n", host, port)
228+
222229
grpcServer := grpc.NewServer()
223230
serving.RegisterServingServiceServer(grpcServer, ser)
224231

225-
// Notify this channel when receiving interrupt or termination signals from OS
226-
c := make(chan os.Signal, 1)
227-
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
228232
go func() {
229233
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
230-
<-c
234+
<-s.grpcStopCh
231235
fmt.Println("Stopping the gRPC server...")
232236
grpcServer.GracefulStop()
233237
}()
@@ -239,6 +243,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
239243
return nil
240244
}
241245

246+
func (s *OnlineFeatureService) Stop() {
247+
s.grpcStopCh <- syscall.SIGINT
248+
}
249+
242250
/*
243251
Read Record Batch from memory managed by Python caller.
244252
Python part uses C ABI interface to export this record into C Data Interface,

go/internal/feast/server/grpc_server.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
7979
})
8080
}
8181

82-
if featuresOrService.FeatureService != nil && s.loggingService != nil {
83-
logger, err := s.loggingService.GetOrCreateLogger(featuresOrService.FeatureService)
82+
featureService := featuresOrService.FeatureService
83+
if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
84+
logger, err := s.loggingService.GetOrCreateLogger(featureService)
8485
if err != nil {
8586
fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err)
8687
}

sdk/python/feast/embedded_go/online_features_service.py

+3
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ def get_online_features(
135135
def start_grpc_server(self, host: str, port: int):
136136
self._service.StartGprcServer(host, port)
137137

138+
def stop_grpc_server(self):
139+
self._service.Stop()
140+
138141

139142
def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array:
140143
if isinstance(value, Value_pb2.RepeatedValue):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import socket
2+
import threading
3+
from contextlib import closing
4+
from typing import List
5+
6+
import grpc
7+
import pytest
8+
9+
from feast import FeatureService, ValueType
10+
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer
11+
from feast.feast_object import FeastObject
12+
from feast.protos.feast.serving.ServingService_pb2 import (
13+
FieldStatus,
14+
GetOnlineFeaturesRequest,
15+
GetOnlineFeaturesResponse,
16+
)
17+
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
18+
from feast.protos.feast.types.Value_pb2 import RepeatedValue
19+
from feast.type_map import python_values_to_proto_values
20+
from feast.wait import wait_retry_backoff
21+
from tests.integration.feature_repos.integration_test_repo_config import (
22+
IntegrationTestRepoConfig,
23+
)
24+
from tests.integration.feature_repos.repo_configuration import (
25+
AVAILABLE_OFFLINE_STORES,
26+
AVAILABLE_ONLINE_STORES,
27+
REDIS_CONFIG,
28+
construct_test_environment,
29+
construct_universal_feature_views,
30+
construct_universal_test_data,
31+
)
32+
from tests.integration.feature_repos.universal.entities import (
33+
customer,
34+
driver,
35+
location,
36+
)
37+
38+
LOCAL_REPO_CONFIGS = [
39+
IntegrationTestRepoConfig(online_store=REDIS_CONFIG, go_feature_retrieval=True),
40+
]
41+
42+
43+
@pytest.fixture(
44+
params=[
45+
c
46+
for c in LOCAL_REPO_CONFIGS
47+
if c.offline_store_creator in AVAILABLE_OFFLINE_STORES
48+
and c.online_store in AVAILABLE_ONLINE_STORES
49+
]
50+
)
51+
def local_environment(request):
52+
e = construct_test_environment(request.param)
53+
54+
def cleanup():
55+
e.feature_store.teardown()
56+
57+
request.addfinalizer(cleanup)
58+
return e
59+
60+
61+
@pytest.fixture
62+
def initialized_registry(local_environment):
63+
fs = local_environment.feature_store
64+
65+
entities, datasets, data_sources = construct_universal_test_data(local_environment)
66+
feature_views = construct_universal_feature_views(data_sources)
67+
68+
feature_service = FeatureService(
69+
name="driver_features", features=[feature_views.driver]
70+
)
71+
feast_objects: List[FeastObject] = [feature_service]
72+
feast_objects.extend(feature_views.values())
73+
feast_objects.extend([driver(), customer(), location()])
74+
75+
fs.apply(feast_objects)
76+
fs.materialize(local_environment.start_date, local_environment.end_date)
77+
78+
79+
@pytest.fixture
80+
def grpc_server_port(local_environment, initialized_registry):
81+
fs = local_environment.feature_store
82+
83+
embedded = EmbeddedOnlineFeatureServer(
84+
repo_path=str(fs.repo_path.absolute()), repo_config=fs.config, feature_store=fs,
85+
)
86+
port = free_port()
87+
88+
t = threading.Thread(target=embedded.start_grpc_server, args=("127.0.0.1", port))
89+
t.start()
90+
91+
wait_retry_backoff(
92+
lambda: (None, check_port_open("127.0.0.1", port)), timeout_secs=15
93+
)
94+
95+
yield port
96+
embedded.stop_grpc_server()
97+
98+
99+
@pytest.fixture
100+
def grpc_client(grpc_server_port):
101+
ch = grpc.insecure_channel(f"localhost:{grpc_server_port}")
102+
yield ServingServiceStub(ch)
103+
104+
105+
@pytest.mark.integration
106+
@pytest.mark.universal
107+
def test_go_grpc_server(grpc_client):
108+
resp: GetOnlineFeaturesResponse = grpc_client.GetOnlineFeatures(
109+
GetOnlineFeaturesRequest(
110+
feature_service="driver_features",
111+
entities={
112+
"driver_id": RepeatedValue(
113+
val=python_values_to_proto_values(
114+
[5001, 5002], feature_type=ValueType.INT64
115+
)
116+
)
117+
},
118+
full_feature_names=True,
119+
)
120+
)
121+
assert list(resp.metadata.feature_names.val) == [
122+
"driver_id",
123+
"driver_stats__conv_rate",
124+
"driver_stats__acc_rate",
125+
"driver_stats__avg_daily_trips",
126+
]
127+
for vector in resp.results:
128+
assert all([s == FieldStatus.PRESENT for s in vector.statuses])
129+
130+
131+
def free_port():
132+
sock = socket.socket()
133+
sock.bind(("", 0))
134+
return sock.getsockname()[1]
135+
136+
137+
def check_port_open(host, port) -> bool:
138+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
139+
return sock.connect_ex((host, port)) == 0

0 commit comments

Comments
 (0)