Skip to content

Commit

Permalink
set timeout even timeout <= 0
Browse files Browse the repository at this point in the history
  • Loading branch information
wzekin authored and Duslia committed Mar 23, 2023
1 parent 88604b1 commit ebbce6d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 47 deletions.
25 changes: 25 additions & 0 deletions pkg/common/config/request_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type RequestOptions struct {
dialTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
// Request timeout. Usually set by DoDeadline or DoTimeout
// if <= 0, means not set
reqTimeout time.Duration
begin time.Time
}

// RequestOption is the only struct to set request-level options.
Expand Down Expand Up @@ -95,6 +99,17 @@ func WithWriteTimeout(t time.Duration) RequestOption {
}}
}

// WithReqTimeout sets whole request timeout. If it reaches timeout,
// the client will return.
//
// This is the request level configuration.
func WithReqTimeout(t time.Duration) RequestOption {
return RequestOption{F: func(o *RequestOptions) {
o.begin = time.Now()
o.reqTimeout = t
}}
}

func (o *RequestOptions) Apply(opts []RequestOption) {
for _, op := range opts {
op.F(o)
Expand Down Expand Up @@ -125,6 +140,14 @@ func (o *RequestOptions) WriteTimeout() time.Duration {
return o.writeTimeout
}

func (o *RequestOptions) ReqTimeout() time.Duration {
return o.reqTimeout
}

func (o *RequestOptions) Begin() time.Time {
return o.begin
}

func (o *RequestOptions) CopyTo(dst *RequestOptions) {
if dst.tags == nil {
dst.tags = make(map[string]string)
Expand All @@ -138,6 +161,8 @@ func (o *RequestOptions) CopyTo(dst *RequestOptions) {
dst.readTimeout = o.readTimeout
dst.writeTimeout = o.writeTimeout
dst.dialTimeout = o.dialTimeout
dst.reqTimeout = o.reqTimeout
dst.begin = time.Now()
}

// SetPreDefinedOpts Pre define some RequestOption here
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/test/mock/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Recorder interface {

func (m *Conn) SetWriteTimeout(t time.Duration) error {
// TODO implement me
panic("implement me")
return nil
}

type SlowReadConn struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/protocol/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ func DoTimeout(ctx context.Context, req *protocol.Request, resp *protocol.Respon
if timeout <= 0 {
return errTimeout
}
req.SetTimeout(timeout)
// Note: it will overwrite the reqTimeout.
req.SetOptions(config.WithReqTimeout(timeout))
return c.Do(ctx, req, resp)
}

Expand All @@ -289,6 +290,7 @@ func DoDeadline(ctx context.Context, req *protocol.Request, resp *protocol.Respo
if timeout <= 0 {
return errTimeout
}
req.SetTimeout(timeout)
// Note: it will overwrite the reqTimeout.
req.SetOptions(config.WithReqTimeout(timeout))
return c.Do(ctx, req, resp)
}
49 changes: 22 additions & 27 deletions pkg/protocol/http1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,8 @@ func (c *HostClient) Do(ctx context.Context, req *protocol.Request, resp *protoc

atomic.AddInt32(&c.pendingRequests, 1)

var before time.Time
if req.GetTimeout() > 0 {
before = time.Now()
}

for {
canIdempotentRetry, err = c.do(req, resp, before)
canIdempotentRetry, err = c.do(req, resp)
if err == nil {
break
}
Expand Down Expand Up @@ -428,14 +423,14 @@ func (c *HostClient) PendingRequests() int {
return int(atomic.LoadInt32(&c.pendingRequests))
}

func (c *HostClient) do(req *protocol.Request, resp *protocol.Response, before time.Time) (bool, error) {
func (c *HostClient) do(req *protocol.Request, resp *protocol.Response) (bool, error) {
nilResp := false
if resp == nil {
nilResp = true
resp = protocol.AcquireResponse()
}

canIdempotentRetry, err := c.doNonNilReqResp(req, resp, before)
canIdempotentRetry, err := c.doNonNilReqResp(req, resp)

if nilResp {
protocol.ReleaseResponse(resp)
Expand Down Expand Up @@ -485,7 +480,7 @@ func updateReqTimeout(reqTimeout, compareTimeout time.Duration, before time.Time
return false, left
}

func (c *HostClient) doNonNilReqResp(req *protocol.Request, resp *protocol.Response, before time.Time) (bool, error) {
func (c *HostClient) doNonNilReqResp(req *protocol.Request, resp *protocol.Response) (bool, error) {
if req == nil {
panic("BUG: req cannot be nil")
}
Expand All @@ -507,10 +502,12 @@ func (c *HostClient) doNonNilReqResp(req *protocol.Request, resp *protocol.Respo
if c.DisablePathNormalizing {
req.URI().DisablePathNormalizing = true
}
reqTimeout := req.Options().ReqTimeout()
begin := req.Options().Begin()

dialTimeout := rc.dialTimeout
if req.GetTimeout() < dialTimeout || dialTimeout == 0 {
dialTimeout = req.GetTimeout()
if reqTimeout < dialTimeout || dialTimeout == 0 {
dialTimeout = reqTimeout
}
cc, err := c.acquireConn(dialTimeout)
// if getting connection error, fast fail
Expand All @@ -527,17 +524,16 @@ func (c *HostClient) doNonNilReqResp(req *protocol.Request, resp *protocol.Respo

resp.ParseNetAddr(conn)

shouldClose, timeout := updateReqTimeout(req.GetTimeout(), rc.writeTimeout, before)
shouldClose, timeout := updateReqTimeout(reqTimeout, rc.writeTimeout, begin)
if shouldClose {
c.closeConn(cc)
return true, errTimeout
}
if timeout > 0 {
if err = conn.SetWriteTimeout(timeout); err != nil {
c.closeConn(cc)
// try another connection if retry is enabled
return true, err
}

if err = conn.SetWriteTimeout(timeout); err != nil {
c.closeConn(cc)
// try another connection if retry is enabled
return true, err
}

resetConnection := false
Expand Down Expand Up @@ -595,19 +591,18 @@ func (c *HostClient) doNonNilReqResp(req *protocol.Request, resp *protocol.Respo
return true, err
}

shouldClose, timeout = updateReqTimeout(req.GetTimeout(), rc.readTimeout, before)
shouldClose, timeout = updateReqTimeout(reqTimeout, rc.readTimeout, begin)
if shouldClose {
c.closeConn(cc)
return true, errTimeout
}
if timeout > 0 {
// Set Deadline every time, since golang has fixed the performance issue
// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
if err = conn.SetReadTimeout(timeout); err != nil {
c.closeConn(cc)
// try another connection if retry is enabled
return true, err
}

// Set Deadline every time, since golang has fixed the performance issue
// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
if err = conn.SetReadTimeout(timeout); err != nil {
c.closeConn(cc)
// try another connection if retry is enabled
return true, err
}

if customSkipBody || req.Header.IsHead() || req.Header.IsConnect() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/http1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestDoNonNilReqResp(t *testing.T) {
req := protocol.AcquireRequest()
resp := protocol.AcquireResponse()
req.SetHost("foobar")
retry, err := c.doNonNilReqResp(req, resp, time.Now())
retry, err := c.doNonNilReqResp(req, resp)
assert.False(t, retry)
assert.Nil(t, err)
assert.DeepEqual(t, resp.StatusCode(), 400)
Expand All @@ -300,7 +300,7 @@ func TestDoNonNilReqResp1(t *testing.T) {
req := protocol.AcquireRequest()
resp := protocol.AcquireResponse()
req.SetHost("foobar")
retry, err := c.doNonNilReqResp(req, resp, time.Now())
retry, err := c.doNonNilReqResp(req, resp)
assert.True(t, retry)
assert.NotNil(t, err)
}
Expand Down
15 changes: 0 additions & 15 deletions pkg/protocol/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"net/url"
"strings"
"sync"
"time"

"github.com/cloudwego/hertz/internal/bytesconv"
"github.com/cloudwego/hertz/internal/bytestr"
Expand Down Expand Up @@ -114,10 +113,6 @@ type Request struct {

// Request level options, service discovery options etc.
options *config.RequestOptions

// Request timeout. Usually set by DoDeadline or DoTimeout
// if <= 0, means not set
timeout time.Duration
}

type requestBodyWriter struct {
Expand Down Expand Up @@ -160,14 +155,6 @@ func (req *Request) AppendBody(p []byte) {
req.BodyBuffer().Write(p) //nolint:errcheck
}

func (req *Request) SetTimeout(timeout time.Duration) {
req.timeout = timeout
}

func (req *Request) GetTimeout() time.Duration {
return req.timeout
}

func (req *Request) BodyBuffer() *bytebufferpool.ByteBuffer {
if req.body == nil {
req.body = requestBodyPool.Get()
Expand Down Expand Up @@ -218,7 +205,6 @@ func (req *Request) Reset() {
req.CloseBodyStream()

req.options = nil
req.timeout = 0
}

func (req *Request) IsURIParsed() bool {
Expand Down Expand Up @@ -398,7 +384,6 @@ func (req *Request) CopyToSkipBody(dst *Request) {
req.options.CopyTo(dst.options)
}

dst.timeout = req.timeout
// do not copy multipartForm - it will be automatically
// re-created on the first call to MultipartForm.
}
Expand Down

0 comments on commit ebbce6d

Please sign in to comment.