Skip to content

Commit a33edaa

Browse files
authored
forward request to the upstream (#6)
* feat: parse upstream rules from config * feat: added forwarding request to the upstream by certain conditions * feat: added tests for forwarding on proxy server * fix: close upstream when the client sent EOF * fix: removed redundant comment * fix: send headers from the upstream to client * perf: cleaned up code a bit
1 parent 2053f61 commit a33edaa

17 files changed

+627
-277
lines changed

_example/mock.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,11 @@ rules:
8080
metadata:
8181
header: { X-Request-Id: "123" }
8282
trailer: { Powered-By: "groxy" }
83+
84+
# The next rule will forward the request to the specified upstream.
85+
- match: { uri: "com.github.Semior001.groxy.example.mock.Upstream/Get" }
86+
forward:
87+
upstream: example-1
88+
header:
89+
X-Request-Id: "123"
90+

pkg/discovery/discovery.go

+27-8
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,20 @@ type Provider interface {
2424
// It returns the name of the provider to update the routing rules.
2525
Events(ctx context.Context) <-chan string
2626

27-
// Rules returns the routing rules.
28-
Rules(ctx context.Context) ([]*Rule, error)
27+
// State returns the current state of the provider.
28+
State(ctx context.Context) (*State, error)
29+
}
30+
31+
// State contains the state of the provider.
32+
type State struct {
33+
// Name is the name of the provider.
34+
Name string
35+
36+
// Rules contains the routing rules.
37+
Rules []*Rule
2938

30-
// Upstreams returns the upstreams.
31-
Upstreams(ctx context.Context) ([]Upstream, error)
39+
// Upstreams contains the upstreams.
40+
Upstreams []Upstream
3241
}
3342

3443
// Mock contains the details of how the handler should reply to the downstream.
@@ -50,6 +59,16 @@ type Rule struct {
5059

5160
// Mock defines the details of how the handler should reply to the downstream.
5261
Mock *Mock
62+
63+
// Forward specifies the upstream to forward the request.
64+
Forward *Forward
65+
}
66+
67+
// Forward specifies the upstream to forward the request and the parameters
68+
// to invoke the upstream.
69+
type Forward struct {
70+
Upstream Upstream
71+
Header metadata.MD
5372
}
5473

5574
// String returns the name of the rule.
@@ -107,15 +126,15 @@ type Upstream interface {
107126
grpc.ClientConnInterface
108127
}
109128

110-
// NamedClosableClientConn is a named closable client connection.
111-
type NamedClosableClientConn struct {
129+
// ClientConn is a named closable client connection.
130+
type ClientConn struct {
112131
ConnName string
113132
ServeReflection bool
114133
*grpc.ClientConn
115134
}
116135

117136
// Name returns the name of the connection.
118-
func (n NamedClosableClientConn) Name() string { return n.ConnName }
137+
func (n ClientConn) Name() string { return n.ConnName }
119138

120139
// Reflection returns true if the connection serves reflection.
121-
func (n NamedClosableClientConn) Reflection() bool { return n.ServeReflection }
140+
func (n ClientConn) Reflection() bool { return n.ServeReflection }

pkg/discovery/fileprovider/config.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@ type Rule struct {
2222
Header map[string]string `yaml:"header"`
2323
Body *string `yaml:"body"`
2424
} `yaml:"match"`
25-
Respond Respond `yaml:"respond"`
25+
Respond *Respond `yaml:"respond,omitempty"`
26+
Forward *Forward `yaml:"forward,omitempty"`
27+
}
28+
29+
// Forward specifies how the service should forward the request.
30+
type Forward struct {
31+
Upstream string `yaml:"upstream"`
32+
Header map[string]string `yaml:"header"`
2633
}
2734

2835
// Respond specifies how the service should respond to the request.

pkg/discovery/fileprovider/file.go

+102-76
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"google.golang.org/grpc/credentials"
2222
"crypto/tls"
2323
"sort"
24+
"github.com/Semior001/groxy/pkg/grpcx"
2425
)
2526

2627
// File discovers the changes in routing rules from a file.
@@ -90,8 +91,8 @@ func (d *File) Events(ctx context.Context) <-chan string {
9091
return res
9192
}
9293

93-
// Rules parses the file and returns the routing rules from it.
94-
func (d *File) Rules(context.Context) ([]*discovery.Rule, error) {
94+
// State parses the file and returns the current state of the provider.
95+
func (d *File) State(ctx context.Context) (*discovery.State, error) {
9596
f, err := os.Open(d.FileName)
9697
if err != nil {
9798
return nil, fmt.Errorf("open file: %w", err)
@@ -100,70 +101,36 @@ func (d *File) Rules(context.Context) ([]*discovery.Rule, error) {
100101
defer f.Close()
101102

102103
var cfg Config
103-
if err = yaml.NewDecoder(f).Decode(&cfg); err != nil {
104+
if err := yaml.NewDecoder(f).Decode(&cfg); err != nil {
104105
return nil, fmt.Errorf("decode file: %w", err)
105106
}
106107

107108
if cfg.Version != "1" {
108109
return nil, fmt.Errorf("unsupported version: %s", cfg.Version)
109110
}
110111

111-
parseRespond := func(r Respond) (result *discovery.Mock, err error) {
112-
result = &discovery.Mock{}
113-
114-
if r.Metadata != nil {
115-
result.Header = metadata.New(r.Metadata.Header)
116-
result.Trailer = metadata.New(r.Metadata.Trailer)
117-
}
118-
119-
switch {
120-
case r.Status != nil && r.Body != nil:
121-
return nil, fmt.Errorf("can't set both status and body in rule")
122-
case r.Status != nil:
123-
var code codes.Code
124-
if err = code.UnmarshalJSON([]byte(fmt.Sprintf("%q", r.Status.Code))); err != nil {
125-
return nil, fmt.Errorf("unmarshal status code: %w", err)
126-
}
127-
result.Status = status.New(code, r.Status.Message)
128-
case r.Body != nil:
129-
if result.Body, err = protodef.BuildMessage(*r.Body); err != nil {
130-
return nil, fmt.Errorf("build respond message: %w", err)
131-
}
132-
default:
133-
return nil, fmt.Errorf("empty response in rule")
134-
}
135-
136-
return result, nil
112+
upstreams, err := d.upstreams(ctx, cfg)
113+
if err != nil {
114+
return nil, fmt.Errorf("get upstreams: %w", err)
137115
}
138116

139-
parseRule := func(r Rule) (result discovery.Rule, err error) {
140-
if r.Match.URI == "" {
141-
return discovery.Rule{}, fmt.Errorf("empty URI in rule")
142-
}
143-
144-
result.Name = r.Match.URI
145-
if result.Match.URI, err = regexp.Compile(r.Match.URI); err != nil {
146-
return discovery.Rule{}, fmt.Errorf("compile URI regexp: %w", err)
147-
}
148-
149-
result.Match.IncomingMetadata = metadata.New(r.Match.Header)
150-
151-
if r.Match.Body != nil {
152-
if result.Match.Message, err = protodef.BuildMessage(*r.Match.Body); err != nil {
153-
return discovery.Rule{}, fmt.Errorf("build request matcher message: %w", err)
154-
}
155-
}
156-
157-
if result.Mock, err = parseRespond(r.Respond); err != nil {
158-
return discovery.Rule{}, fmt.Errorf("parse respond: %w", err)
159-
}
160-
161-
return result, nil
117+
rules, err := d.rules(cfg, upstreams)
118+
if err != nil {
119+
return nil, fmt.Errorf("get rules: %w", err)
162120
}
163121

122+
return &discovery.State{
123+
Name: d.Name(),
124+
Rules: rules,
125+
Upstreams: upstreams,
126+
}, nil
127+
}
128+
129+
// Rules parses the file and returns the routing rules from it.
130+
func (d *File) rules(cfg Config, upstreams []discovery.Upstream) ([]*discovery.Rule, error) {
164131
rules := make([]*discovery.Rule, 0, len(cfg.Rules)+1)
165132
for idx, r := range cfg.Rules {
166-
rule, err := parseRule(r)
133+
rule, err := parseRule(r, upstreams)
167134
if err != nil {
168135
return nil, fmt.Errorf("parse rule #%d: %w", idx, err)
169136
}
@@ -172,37 +139,22 @@ func (d *File) Rules(context.Context) ([]*discovery.Rule, error) {
172139
}
173140

174141
if cfg.NotMatched != nil {
142+
mock, err := parseRespond(cfg.NotMatched)
143+
if err != nil {
144+
return nil, fmt.Errorf("parse respond: %w", err)
145+
}
175146
rule := discovery.Rule{
176147
Name: "not matched",
177148
Match: discovery.RequestMatcher{URI: regexp.MustCompile(".*")},
178-
}
179-
if rule.Mock, err = parseRespond(*cfg.NotMatched); err != nil {
180-
return nil, fmt.Errorf("parse respond: %w", err)
149+
Mock: mock,
181150
}
182151
rules = append(rules, &rule)
183152
}
184153

185154
return rules, nil
186155
}
187156

188-
// Upstreams parses the file and returns the upstreams from it.
189-
func (d *File) Upstreams(ctx context.Context) ([]discovery.Upstream, error) {
190-
f, err := os.Open(d.FileName)
191-
if err != nil {
192-
return nil, fmt.Errorf("open file: %w", err)
193-
}
194-
195-
defer f.Close()
196-
197-
var cfg Config
198-
if err = yaml.NewDecoder(f).Decode(&cfg); err != nil {
199-
return nil, fmt.Errorf("decode file: %w", err)
200-
}
201-
202-
if cfg.Version != "1" {
203-
return nil, fmt.Errorf("unsupported version: %s", cfg.Version)
204-
}
205-
157+
func (d *File) upstreams(ctx context.Context, cfg Config) ([]discovery.Upstream, error) {
206158
res := make([]discovery.Upstream, 0, len(cfg.Upstreams))
207159
for name, u := range cfg.Upstreams {
208160
cred := insecure.NewCredentials()
@@ -219,12 +171,15 @@ func (d *File) Upstreams(ctx context.Context) ([]discovery.Upstream, error) {
219171
slog.String("address", u.Addr),
220172
slog.Bool("tls", u.TLS))
221173

222-
cc, err := grpc.DialContext(ctx, u.Addr, grpc.WithTransportCredentials(cred))
174+
cc, err := grpc.DialContext(ctx, u.Addr,
175+
grpc.WithTransportCredentials(cred),
176+
grpc.WithStreamInterceptor(grpcx.ClientLogInterceptor(slog.Default())),
177+
)
223178
if err != nil {
224179
return nil, fmt.Errorf("dial upstream %q: %w", name, err)
225180
}
226181

227-
res = append(res, discovery.NamedClosableClientConn{
182+
res = append(res, discovery.ClientConn{
228183
ConnName: name,
229184
ServeReflection: u.ServeReflection,
230185
ClientConn: cc,
@@ -253,3 +208,74 @@ func (d *File) getModifTime(ctx context.Context) (modif time.Time, ok bool) {
253208

254209
return fi.ModTime(), true
255210
}
211+
212+
func parseRule(r Rule, upstreams []discovery.Upstream) (result discovery.Rule, err error) {
213+
if r.Match.URI == "" {
214+
return discovery.Rule{}, fmt.Errorf("empty URI in rule")
215+
}
216+
217+
result.Name = r.Match.URI
218+
if result.Match.URI, err = regexp.Compile(r.Match.URI); err != nil {
219+
return discovery.Rule{}, fmt.Errorf("compile URI regexp: %w", err)
220+
}
221+
222+
result.Match.IncomingMetadata = metadata.New(r.Match.Header)
223+
224+
if r.Match.Body != nil {
225+
if result.Match.Message, err = protodef.BuildMessage(*r.Match.Body); err != nil {
226+
return discovery.Rule{}, fmt.Errorf("build request matcher message: %w", err)
227+
}
228+
}
229+
230+
if r.Forward != nil {
231+
result.Forward = &discovery.Forward{Header: metadata.New(r.Forward.Header)}
232+
for _, up := range upstreams {
233+
if up.Name() == r.Forward.Upstream {
234+
result.Forward.Upstream = up
235+
break
236+
}
237+
}
238+
}
239+
240+
if result.Mock, err = parseRespond(r.Respond); err != nil {
241+
return discovery.Rule{}, fmt.Errorf("parse respond: %w", err)
242+
}
243+
244+
if result.Mock != nil && result.Forward != nil {
245+
return discovery.Rule{}, fmt.Errorf("can't set both mock and forward in rule")
246+
}
247+
248+
return result, nil
249+
}
250+
251+
func parseRespond(r *Respond) (result *discovery.Mock, err error) {
252+
if r == nil {
253+
return nil, nil
254+
}
255+
256+
result = &discovery.Mock{}
257+
258+
if r.Metadata != nil {
259+
result.Header = metadata.New(r.Metadata.Header)
260+
result.Trailer = metadata.New(r.Metadata.Trailer)
261+
}
262+
263+
switch {
264+
case r.Status != nil && r.Body != nil:
265+
return nil, fmt.Errorf("can't set both status and body in rule")
266+
case r.Status != nil:
267+
var code codes.Code
268+
if err = code.UnmarshalJSON([]byte(fmt.Sprintf("%q", r.Status.Code))); err != nil {
269+
return nil, fmt.Errorf("unmarshal status code: %w", err)
270+
}
271+
result.Status = status.New(code, r.Status.Message)
272+
case r.Body != nil:
273+
if result.Body, err = protodef.BuildMessage(*r.Body); err != nil {
274+
return nil, fmt.Errorf("build respond message: %w", err)
275+
}
276+
default:
277+
return nil, fmt.Errorf("empty response in rule")
278+
}
279+
280+
return result, nil
281+
}

0 commit comments

Comments
 (0)