Skip to content

Commit

Permalink
feat: improve client observation
Browse files Browse the repository at this point in the history
  • Loading branch information
Duslia committed Dec 23, 2022
1 parent ce6ab72 commit 743ce27
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,5 +629,6 @@ func newHttp1OptionFromClient(c *Client) *http1.ClientOptions {
ResponseBodyStream: c.options.ResponseBodyStream,
RetryConfig: c.options.RetryConfig,
RetryIfFunc: c.RetryIfFunc,
StateObserve: c.options.HostClientStateObserve,
}
}
31 changes: 23 additions & 8 deletions pkg/app/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ import (
"github.com/cloudwego/hertz/pkg/route"
)

var (
errTooManyRedirects = errors.New("too many redirects detected when doing the request")
errNoFreeConns = errors.New("no free connections available to host")
)
var errTooManyRedirects = errors.New("too many redirects detected when doing the request")

func TestCloseIdleConnections(t *testing.T) {
opt := config.NewOptions([]config.Option{})
Expand Down Expand Up @@ -834,7 +831,7 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) {

for {
if err := c.DoDeadline(context.Background(), req, resp, time.Now().Add(timeout)); err != nil {
if err.Error() == errNoFreeConns.Error() {
if err.Error() == errs.ErrNoFreeConns.Error() {
time.Sleep(time.Millisecond * 500)
continue
}
Expand Down Expand Up @@ -1125,7 +1122,7 @@ func TestHostClientMaxConnWaitTimeoutSuccess(t *testing.T) {
wg.Wait()

if c.WantConnectionCount() > 0 {
t.Errorf("connsWait has %v items remaining", c.WantConnectionCount())
t.Errorf("connsWait has %v items remaining", c.(*http1.HostClient).WantConnectionCount())
}

if emptyBodyCount > 0 {
Expand Down Expand Up @@ -1179,8 +1176,8 @@ func TestHostClientMaxConnWaitTimeoutError(t *testing.T) {
resp := protocol.AcquireResponse()

if err := c.Do(context.Background(), req, resp); err != nil {
if err.Error() != errNoFreeConns.Error() {
t.Errorf("unexpected error: %s. Expecting %s", err.Error(), errNoFreeConns.Error())
if err.Error() != errs.ErrNoFreeConns.Error() {
t.Errorf("unexpected error: %s. Expecting %s", err.Error(), errs.ErrNoFreeConns.Error())
}
atomic.AddUint32(&errNoFreeConnsCount, 1)
} else {
Expand Down Expand Up @@ -2152,3 +2149,21 @@ func TestClientDoWithDialFunc(t *testing.T) {
t.Fatalf("timeout")
}
}

func TestClientState(t *testing.T) {
opt := config.NewOptions([]config.Option{})
opt.Addr = ":11000"
engine := route.NewEngine(opt)
go engine.Run()

client, _ := NewClient(WithConnStateObserve(func(hcs config.HostClientState) {
assert.DeepEqual(t, 0, hcs.ConnPoolState().ConnNum)
time.Sleep(time.Second * 2)
assert.DeepEqual(t, 1, hcs.ConnPoolState().ConnNum)
assert.DeepEqual(t, "127.0.0.1:11000", hcs.ConnPoolState().Host)
}))

time.Sleep(time.Second)
client.Get(context.Background(), nil, "http://127.0.0.1:11000")
time.Sleep(time.Second * 3)
}
7 changes: 7 additions & 0 deletions pkg/app/client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ func WithWriteTimeout(t time.Duration) config.ClientOption {
}}
}

// WithConnStateObserve sets the connection state observation function.
func WithConnStateObserve(hs config.HostClientStateFunc) config.ClientOption {
return config.ClientOption{F: func(o *config.ClientOptions) {
o.HostClientStateObserve = hs
}}
}

// WithDialFunc is used to set dialer function.
// Note: WithDialFunc will overwrite custom dialer.
func WithDialFunc(f network.DialFunc, dialers ...network.Dialer) config.ClientOption {
Expand Down
14 changes: 14 additions & 0 deletions pkg/common/config/client_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ import (
"github.com/cloudwego/hertz/pkg/protocol/consts"
)

type ConnPoolState struct {
ConnNum int
WaitConnNum int
Host string
}

type HostClientState interface {
ConnPoolState() ConnPoolState
}

type HostClientStateFunc func(HostClientState)

// ClientOption is the only struct that can be used to set ClientOptions.
type ClientOption struct {
F func(o *ClientOptions)
Expand Down Expand Up @@ -106,6 +118,8 @@ type ClientOptions struct {

// all configurations related to retry
RetryConfig *retry.Config

HostClientStateObserve HostClientStateFunc
}

func NewClientOptions(opts []ClientOption) *ClientOptions {
Expand Down
1 change: 1 addition & 0 deletions pkg/common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrDialTimeout = errors.New("dial timeout")
ErrNothingRead = errors.New("nothing read")
ErrShortConnection = errors.New("short connection")
ErrNoFreeConns = errors.New("no free connections available to host")
ErrConnectionClosed = errors.New("connection closed")
)

Expand Down
45 changes: 32 additions & 13 deletions pkg/protocol/http1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,8 @@ import (
respI "github.com/cloudwego/hertz/pkg/protocol/http1/resp"
)

var (
errNoFreeConns = errs.NewPublic("no free connections available to host")
errConnectionClosed = errs.NewPublic("the server closed connection before returning the first response byte. " +
"Make sure the server returns 'Connection: close' response header before closing the connection")
)
var errConnectionClosed = errs.NewPublic("the server closed connection before returning the first response byte. " +
"Make sure the server returns 'Connection: close' response header before closing the connection")

// HostClient balances http requests among hosts listed in Addr.
//
Expand Down Expand Up @@ -167,6 +164,20 @@ func (c *HostClient) WantConnectionCount() (count int) {
return c.connsWait.len()
}

func (c *HostClient) ConnPoolState() config.ConnPoolState {
c.connsLock.Lock()
defer c.connsLock.Unlock()
cps := config.ConnPoolState{
ConnNum: len(c.conns),
Host: c.Addr,
}

if c.connsWait != nil {
cps.WaitConnNum = c.connsWait.len()
}
return cps
}

// GetTimeout returns the status code and body of url.
//
// The contents of dst will be replaced by the body and returned, if the dst
Expand Down Expand Up @@ -252,7 +263,7 @@ type wantConn struct {
// errTimeout is returned if the response wasn't returned during
// the given timeout.
//
// errNoFreeConns is returned if all HostClient.MaxConns connections
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
Expand All @@ -279,7 +290,7 @@ func (c *HostClient) DoTimeout(ctx context.Context, req *protocol.Request, resp
// errTimeout is returned if the response wasn't returned until
// the given deadline.
//
// errNoFreeConns is returned if all HostClient.MaxConns connections
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
Expand All @@ -302,7 +313,7 @@ func (c *HostClient) DoDeadline(ctx context.Context, req *protocol.Request, resp
//
// Response is ignored if resp is nil.
//
// errNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
// to the requested host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
Expand All @@ -321,7 +332,7 @@ func (c *HostClient) DoRedirects(ctx context.Context, req *protocol.Request, res
//
// Response is ignored if resp is nil.
//
// errNoFreeConns is returned if all HostClient.MaxConns connections
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
Expand Down Expand Up @@ -627,7 +638,7 @@ func (c *HostClient) acquireConn(dialTimeout time.Duration) (cc *clientConn, err
}
if !createConn {
if c.MaxConnWaitTimeout <= 0 {
return nil, errNoFreeConns
return nil, errs.ErrNoFreeConns
}

timeout := c.MaxConnWaitTimeout
Expand Down Expand Up @@ -656,7 +667,7 @@ func (c *HostClient) acquireConn(dialTimeout time.Duration) (cc *clientConn, err
case <-w.ready:
return w.conn, w.err
case <-tc.C:
return nil, errNoFreeConns
return nil, errs.ErrNoFreeConns
}
}

Expand Down Expand Up @@ -1116,9 +1127,15 @@ func (q *wantConnQueue) clearFront() (cleaned bool) {
}

func NewHostClient(c *ClientOptions) client.HostClient {
return &HostClient{
hc := &HostClient{
ClientOptions: c,
}

if c.StateObserve != nil {
go c.StateObserve(hc)
}

return hc
}

type ClientOptions struct {
Expand Down Expand Up @@ -1218,7 +1235,7 @@ type ClientOptions struct {

// Maximum duration for waiting for a free connection.
//
// By default will not wait, return errNoFreeConns immediately
// By default will not wait, return ErrNoFreeConns immediately
MaxConnWaitTimeout time.Duration

// ResponseBodyStream enables response body streaming
Expand All @@ -1228,4 +1245,6 @@ type ClientOptions struct {
RetryConfig *retry.Config

RetryIfFunc client.RetryIfFunc

StateObserve config.HostClientStateFunc
}
4 changes: 2 additions & 2 deletions pkg/protocol/http1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func TestHostClientMaxConnWaitTimeoutWithEarlierDeadline(t *testing.T) {
break
}
w.mu.Lock()
if w.err != nil && !errors.Is(w.err, errNoFreeConns) {
t.Errorf("unexpected error: %s. Expecting %s", w.err, errNoFreeConns)
if w.err != nil && !errors.Is(w.err, errs.ErrNoFreeConns) {
t.Errorf("unexpected error: %s. Expecting %s", w.err, errs.ErrNoFreeConns)
}
w.mu.Unlock()
}
Expand Down

0 comments on commit 743ce27

Please sign in to comment.