From ce6ab7284fef74f75427a70fe9228409904001d5 Mon Sep 17 00:00:00 2001 From: kinggo Date: Thu, 22 Dec 2022 19:06:37 +0800 Subject: [PATCH 1/3] optimize: add WithDialFunc for client (#447) Co-authored-by: GuangyuFan <97507466+FGYFFFF@users.noreply.github.com> --- pkg/app/client/client_test.go | 67 +++++++++++++++++++++++++++++++++++ pkg/app/client/option.go | 34 ++++++++++++++++++ pkg/network/connection.go | 2 ++ 3 files changed, 103 insertions(+) diff --git a/pkg/app/client/client_test.go b/pkg/app/client/client_test.go index c754ce510..ac0307e9e 100644 --- a/pkg/app/client/client_test.go +++ b/pkg/app/client/client_test.go @@ -2085,3 +2085,70 @@ func TestClientDialerName(t *testing.T) { t.Errorf("expected 'empty string', but get %s", dName) } } + +func TestClientDoWithDialFunc(t *testing.T) { + t.Parallel() + + ch := make(chan error, 1) + uri := "/foo/bar/baz" + body := "request body" + opt := config.NewOptions([]config.Option{}) + + opt.Addr = "unix-test-10021" + opt.Network = "unix" + engine := route.NewEngine(opt) + + engine.POST("/foo/bar/baz", func(c context.Context, ctx *app.RequestContext) { + if string(ctx.Request.Header.Method()) != consts.MethodPost { + ch <- fmt.Errorf("unexpected request method: %q. Expecting %q", ctx.Request.Header.Method(), consts.MethodPost) + return + } + reqURI := ctx.Request.RequestURI() + if string(reqURI) != uri { + ch <- fmt.Errorf("unexpected request uri: %q. Expecting %q", reqURI, uri) + return + } + cl := ctx.Request.Header.ContentLength() + if cl != len(body) { + ch <- fmt.Errorf("unexpected content-length %d. Expecting %d", cl, len(body)) + return + } + reqBody := ctx.Request.Body() + if string(reqBody) != body { + ch <- fmt.Errorf("unexpected request body: %q. Expecting %q", reqBody, body) + return + } + ch <- nil + }) + go engine.Run() + defer func() { + engine.Close() + }() + time.Sleep(time.Millisecond * 500) + + c, _ := NewClient(WithDialFunc(func(addr string) (network.Conn, error) { + return dialer.DialConnection(opt.Network, opt.Addr, time.Second, nil) + })) + + var req protocol.Request + req.Header.SetMethod(consts.MethodPost) + req.SetRequestURI(uri) + req.SetHost("xxx.com") + req.SetBodyString(body) + + var resp protocol.Response + + err := c.Do(context.Background(), &req, &resp) + if err != nil { + t.Fatalf("error when doing request: %s", err) + } + + select { + case err = <-ch: + if err != nil { + t.Fatalf("err = %s", err.Error()) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + } +} diff --git a/pkg/app/client/option.go b/pkg/app/client/option.go index 8a64ed844..a8c751058 100644 --- a/pkg/app/client/option.go +++ b/pkg/app/client/option.go @@ -23,6 +23,7 @@ import ( "github.com/cloudwego/hertz/pkg/app/client/retry" "github.com/cloudwego/hertz/pkg/common/config" "github.com/cloudwego/hertz/pkg/network" + "github.com/cloudwego/hertz/pkg/network/dialer" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol/consts" ) @@ -147,3 +148,36 @@ func WithWriteTimeout(t time.Duration) config.ClientOption { o.WriteTimeout = t }} } + +// WithDialFunc is used to set dialer function. +// Note: WithDialFunc will overwrite custom dialer. +func WithDialFunc(f network.DialFunc, dialers ...network.Dialer) config.ClientOption { + return config.ClientOption{F: func(o *config.ClientOptions) { + d := dialer.DefaultDialer() + if len(dialers) != 0 { + d = dialers[0] + } + o.Dialer = newCustomDialerWithDialFunc(d, f) + }} +} + +// customDialer set customDialerFunc and params to set dailFunc +type customDialer struct { + network.Dialer + dialFunc network.DialFunc +} + +func (m *customDialer) DialConnection(network, address string, timeout time.Duration, + tlsConfig *tls.Config) (conn network.Conn, err error) { + if m.dialFunc != nil { + return m.dialFunc(address) + } + return m.Dialer.DialConnection(network, address, timeout, tlsConfig) +} + +func newCustomDialerWithDialFunc(dialer network.Dialer, dialFunc network.DialFunc) network.Dialer { + return &customDialer{ + Dialer: dialer, + dialFunc: dialFunc, + } +} diff --git a/pkg/network/connection.go b/pkg/network/connection.go index 1559c1c54..823ce362e 100644 --- a/pkg/network/connection.go +++ b/pkg/network/connection.go @@ -98,3 +98,5 @@ type HandleSpecificError interface { type ErrorNormalization interface { ToHertzError(err error) error } + +type DialFunc func(addr string) (Conn, error) From ebdd2f788c07b6576972c0c4157fa2d1a62cad0b Mon Sep 17 00:00:00 2001 From: "Asterisk L. Yuan" <92938836+L2ncE@users.noreply.github.com> Date: Mon, 26 Dec 2022 14:22:42 +0800 Subject: [PATCH 2/3] ci: automatically skip when there is a label except `invalid issue` (#490) --- .github/workflows/labeler.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 080e937a6..f9d9da932 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -5,6 +5,7 @@ on: jobs: triage: + if: contains(github.event.issue.labels.*.name, 'invalid issue') || join(github.event.issue.labels) == '' runs-on: ubuntu-latest name: Label issues steps: From b3098118d07201880d8ea622a582e912fba1e073 Mon Sep 17 00:00:00 2001 From: Xuran <37136584+Duslia@users.noreply.github.com> Date: Mon, 26 Dec 2022 15:04:34 +0800 Subject: [PATCH 3/3] feat: improve observability of hostclient (#380) --- pkg/app/client/client.go | 1 + pkg/app/client/client_test.go | 29 ++++++++++++++----- pkg/app/client/option.go | 7 +++++ pkg/common/config/client_option.go | 14 +++++++++ pkg/common/errors/errors.go | 1 + pkg/protocol/http1/client.go | 46 +++++++++++++++++++++--------- pkg/protocol/http1/client_test.go | 4 +-- 7 files changed, 80 insertions(+), 22 deletions(-) diff --git a/pkg/app/client/client.go b/pkg/app/client/client.go index 3fd25714b..8d3a03bf1 100644 --- a/pkg/app/client/client.go +++ b/pkg/app/client/client.go @@ -629,5 +629,6 @@ func newHttp1OptionFromClient(c *Client) *http1.ClientOptions { ResponseBodyStream: c.options.ResponseBodyStream, RetryConfig: c.options.RetryConfig, RetryIfFunc: c.RetryIfFunc, + StateObserve: c.options.HostClientStateObserve, } } diff --git a/pkg/app/client/client_test.go b/pkg/app/client/client_test.go index ac0307e9e..1891de8f9 100644 --- a/pkg/app/client/client_test.go +++ b/pkg/app/client/client_test.go @@ -81,10 +81,7 @@ import ( "github.com/cloudwego/hertz/pkg/route" ) -var ( - errTooManyRedirects = errors.New("too many redirects detected when doing the request") - errNoFreeConns = errors.New("no free connections available to host") -) +var errTooManyRedirects = errors.New("too many redirects detected when doing the request") func TestCloseIdleConnections(t *testing.T) { opt := config.NewOptions([]config.Option{}) @@ -834,7 +831,7 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) { for { if err := c.DoDeadline(context.Background(), req, resp, time.Now().Add(timeout)); err != nil { - if err.Error() == errNoFreeConns.Error() { + if err.Error() == errs.ErrNoFreeConns.Error() { time.Sleep(time.Millisecond * 500) continue } @@ -1179,8 +1176,8 @@ func TestHostClientMaxConnWaitTimeoutError(t *testing.T) { resp := protocol.AcquireResponse() if err := c.Do(context.Background(), req, resp); err != nil { - if err.Error() != errNoFreeConns.Error() { - t.Errorf("unexpected error: %s. Expecting %s", err.Error(), errNoFreeConns.Error()) + if err.Error() != errs.ErrNoFreeConns.Error() { + t.Errorf("unexpected error: %s. Expecting %s", err.Error(), errs.ErrNoFreeConns.Error()) } atomic.AddUint32(&errNoFreeConnsCount, 1) } else { @@ -2152,3 +2149,21 @@ func TestClientDoWithDialFunc(t *testing.T) { t.Fatalf("timeout") } } + +func TestClientState(t *testing.T) { + opt := config.NewOptions([]config.Option{}) + opt.Addr = ":11000" + engine := route.NewEngine(opt) + go engine.Run() + + client, _ := NewClient(WithConnStateObserve(func(hcs config.HostClientState) { + assert.DeepEqual(t, 0, hcs.ConnPoolState().ConnNum) + time.Sleep(time.Second * 2) + assert.DeepEqual(t, 1, hcs.ConnPoolState().ConnNum) + assert.DeepEqual(t, "127.0.0.1:11000", hcs.ConnPoolState().Addr) + })) + + time.Sleep(time.Second) + client.Get(context.Background(), nil, "http://127.0.0.1:11000") + time.Sleep(time.Second * 3) +} diff --git a/pkg/app/client/option.go b/pkg/app/client/option.go index a8c751058..37e086449 100644 --- a/pkg/app/client/option.go +++ b/pkg/app/client/option.go @@ -149,6 +149,13 @@ func WithWriteTimeout(t time.Duration) config.ClientOption { }} } +// WithConnStateObserve sets the connection state observation function. +func WithConnStateObserve(hs config.HostClientStateFunc) config.ClientOption { + return config.ClientOption{F: func(o *config.ClientOptions) { + o.HostClientStateObserve = hs + }} +} + // WithDialFunc is used to set dialer function. // Note: WithDialFunc will overwrite custom dialer. func WithDialFunc(f network.DialFunc, dialers ...network.Dialer) config.ClientOption { diff --git a/pkg/common/config/client_option.go b/pkg/common/config/client_option.go index 8b44d243c..99a7c46fe 100644 --- a/pkg/common/config/client_option.go +++ b/pkg/common/config/client_option.go @@ -25,6 +25,18 @@ import ( "github.com/cloudwego/hertz/pkg/protocol/consts" ) +type ConnPoolState struct { + ConnNum int + WaitConnNum int + Addr string +} + +type HostClientState interface { + ConnPoolState() ConnPoolState +} + +type HostClientStateFunc func(HostClientState) + // ClientOption is the only struct that can be used to set ClientOptions. type ClientOption struct { F func(o *ClientOptions) @@ -106,6 +118,8 @@ type ClientOptions struct { // all configurations related to retry RetryConfig *retry.Config + + HostClientStateObserve HostClientStateFunc } func NewClientOptions(opts []ClientOption) *ClientOptions { diff --git a/pkg/common/errors/errors.go b/pkg/common/errors/errors.go index e9cccdb6d..4bfc79c9a 100644 --- a/pkg/common/errors/errors.go +++ b/pkg/common/errors/errors.go @@ -60,6 +60,7 @@ var ( ErrDialTimeout = errors.New("dial timeout") ErrNothingRead = errors.New("nothing read") ErrShortConnection = errors.New("short connection") + ErrNoFreeConns = errors.New("no free connections available to host") ErrConnectionClosed = errors.New("connection closed") ) diff --git a/pkg/protocol/http1/client.go b/pkg/protocol/http1/client.go index 6d4263618..e9c150471 100644 --- a/pkg/protocol/http1/client.go +++ b/pkg/protocol/http1/client.go @@ -71,11 +71,8 @@ import ( respI "github.com/cloudwego/hertz/pkg/protocol/http1/resp" ) -var ( - errNoFreeConns = errs.NewPublic("no free connections available to host") - errConnectionClosed = errs.NewPublic("the server closed connection before returning the first response byte. " + - "Make sure the server returns 'Connection: close' response header before closing the connection") -) +var errConnectionClosed = errs.NewPublic("the server closed connection before returning the first response byte. " + + "Make sure the server returns 'Connection: close' response header before closing the connection") // HostClient balances http requests among hosts listed in Addr. // @@ -129,6 +126,11 @@ func (c *HostClient) SetDynamicConfig(dc *client.DynamicConfig) { c.Addr = dc.Addr c.ProxyURI = dc.ProxyURI c.IsTLS = dc.IsTLS + + // start observation after setting addr + if c.StateObserve != nil { + go c.StateObserve(c) + } } type clientConn struct { @@ -167,6 +169,20 @@ func (c *HostClient) WantConnectionCount() (count int) { return c.connsWait.len() } +func (c *HostClient) ConnPoolState() config.ConnPoolState { + c.connsLock.Lock() + defer c.connsLock.Unlock() + cps := config.ConnPoolState{ + ConnNum: len(c.conns), + Addr: c.Addr, + } + + if c.connsWait != nil { + cps.WaitConnNum = c.connsWait.len() + } + return cps +} + // GetTimeout returns the status code and body of url. // // The contents of dst will be replaced by the body and returned, if the dst @@ -252,7 +268,7 @@ type wantConn struct { // errTimeout is returned if the response wasn't returned during // the given timeout. // -// errNoFreeConns is returned if all HostClient.MaxConns connections +// ErrNoFreeConns is returned if all HostClient.MaxConns connections // to the host are busy. // // It is recommended obtaining req and resp via AcquireRequest @@ -279,7 +295,7 @@ func (c *HostClient) DoTimeout(ctx context.Context, req *protocol.Request, resp // errTimeout is returned if the response wasn't returned until // the given deadline. // -// errNoFreeConns is returned if all HostClient.MaxConns connections +// ErrNoFreeConns is returned if all HostClient.MaxConns connections // to the host are busy. // // It is recommended obtaining req and resp via AcquireRequest @@ -302,7 +318,7 @@ func (c *HostClient) DoDeadline(ctx context.Context, req *protocol.Request, resp // // Response is ignored if resp is nil. // -// errNoFreeConns is returned if all DefaultMaxConnsPerHost connections +// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections // to the requested host are busy. // // It is recommended obtaining req and resp via AcquireRequest @@ -321,7 +337,7 @@ func (c *HostClient) DoRedirects(ctx context.Context, req *protocol.Request, res // // Response is ignored if resp is nil. // -// errNoFreeConns is returned if all HostClient.MaxConns connections +// ErrNoFreeConns is returned if all HostClient.MaxConns connections // to the host are busy. // // It is recommended obtaining req and resp via AcquireRequest @@ -627,7 +643,7 @@ func (c *HostClient) acquireConn(dialTimeout time.Duration) (cc *clientConn, err } if !createConn { if c.MaxConnWaitTimeout <= 0 { - return nil, errNoFreeConns + return nil, errs.ErrNoFreeConns } timeout := c.MaxConnWaitTimeout @@ -656,7 +672,7 @@ func (c *HostClient) acquireConn(dialTimeout time.Duration) (cc *clientConn, err case <-w.ready: return w.conn, w.err case <-tc.C: - return nil, errNoFreeConns + return nil, errs.ErrNoFreeConns } } @@ -1116,9 +1132,11 @@ func (q *wantConnQueue) clearFront() (cleaned bool) { } func NewHostClient(c *ClientOptions) client.HostClient { - return &HostClient{ + hc := &HostClient{ ClientOptions: c, } + + return hc } type ClientOptions struct { @@ -1218,7 +1236,7 @@ type ClientOptions struct { // Maximum duration for waiting for a free connection. // - // By default will not wait, return errNoFreeConns immediately + // By default will not wait, return ErrNoFreeConns immediately MaxConnWaitTimeout time.Duration // ResponseBodyStream enables response body streaming @@ -1228,4 +1246,6 @@ type ClientOptions struct { RetryConfig *retry.Config RetryIfFunc client.RetryIfFunc + + StateObserve config.HostClientStateFunc } diff --git a/pkg/protocol/http1/client_test.go b/pkg/protocol/http1/client_test.go index ad63f4181..f61e491fb 100644 --- a/pkg/protocol/http1/client_test.go +++ b/pkg/protocol/http1/client_test.go @@ -124,8 +124,8 @@ func TestHostClientMaxConnWaitTimeoutWithEarlierDeadline(t *testing.T) { break } w.mu.Lock() - if w.err != nil && !errors.Is(w.err, errNoFreeConns) { - t.Errorf("unexpected error: %s. Expecting %s", w.err, errNoFreeConns) + if w.err != nil && !errors.Is(w.err, errs.ErrNoFreeConns) { + t.Errorf("unexpected error: %s. Expecting %s", w.err, errs.ErrNoFreeConns) } w.mu.Unlock() }