-
Notifications
You must be signed in to change notification settings - Fork 515
/
server.go
135 lines (114 loc) · 3.65 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Package test contains test utilities
package test
import (
"context"
"fmt"
"log"
"net"
"net/http"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/v3"
gcplogger "github.com/envoyproxy/go-control-plane/pkg/log"
)
const (
grpcKeepaliveTime = 30 * time.Second
grpcKeepaliveTimeout = 5 * time.Second
grpcKeepaliveMinTime = 30 * time.Second
grpcMaxConcurrentStreams = 1000000
)
// HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
// specialized to Envoy xDS API.
type HTTPGateway struct {
// Log is an optional log for errors in response write
Log gcplogger.Logger
Gateway server.HTTPGateway
}
// RunAccessLogServer starts an accesslog server.
func RunAccessLogServer(ctx context.Context, als *test.AccessLogService, alsPort uint) {
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", alsPort))
if err != nil {
log.Fatal(err)
}
test.RegisterAccessLogServer(grpcServer, als)
log.Printf("access log server listening on %d\n", alsPort)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementServer starts an xDS server at the given port.
func RunManagementServer(ctx context.Context, srv server.Server, port uint) {
// gRPC golang library sets a very small upper bound for the number gRPC/h2
// streams over a single TCP connection. If a proxy multiplexes requests over
// a single connection to the management server, then it might lead to
// availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions,
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: grpcKeepaliveTime,
Timeout: grpcKeepaliveTimeout,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: grpcKeepaliveMinTime,
PermitWithoutStream: true,
}),
)
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
test.RegisterServer(grpcServer, srv)
log.Printf("management server listening on %d\n", port)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementGateway starts an HTTP gateway to an xDS server.
func RunManagementGateway(ctx context.Context, srv server.Server, port uint) {
log.Printf("gateway listening HTTP/1.1 on %d\n", port)
// Ignore: G114: Use of net/http serve function that has no support for setting timeouts
// nolint:gosec
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: &HTTPGateway{
Gateway: server.HTTPGateway{Server: srv},
},
}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to start listening: %s", err)
}
}()
<-ctx.Done()
// Cleanup our gateway if we receive a shutdown
if err := server.Shutdown(ctx); err != nil {
log.Printf("failed to shut down: %s", err)
}
}
func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
bytes, code, err := h.Gateway.ServeHTTP(req)
if err != nil {
http.Error(resp, err.Error(), code)
return
}
if bytes == nil {
resp.WriteHeader(http.StatusNotModified)
return
}
if _, err = resp.Write(bytes); err != nil && h.Log != nil {
h.Log.Errorf("gateway error: %v", err)
}
}