Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: refact heartbeat #889

Merged
merged 5 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions remoting/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package remoting

import (
"sync"
"time"
)

Expand All @@ -26,13 +27,19 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)

var (
// generate request ID for global use
sequence atomic.Int64

// store requestID and response
pendingResponses = new(sync.Map)
)

type SequenceType int64

func init() {
// init request ID
sequence.Store(0)
Expand Down Expand Up @@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool {
return response.Event && response.Result == nil
}

func (response *Response) Handle() {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}

pendingResponse.response = response

if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}

type Options struct {
// connect timeout
ConnectTimeout time.Duration
Expand Down Expand Up @@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
Reply: r.response,
}
}

// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}

// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}

// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
60 changes: 0 additions & 60 deletions remoting/exchange_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package remoting

import (
"errors"
"sync"
"time"
)

Expand All @@ -28,19 +27,10 @@ import (
"github.com/apache/dubbo-go/protocol"
)

var (
// store requestID and response
pendingResponses = new(sync.Map)
)

type SequenceType int64

// It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface {
SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url
Connect(url *common.URL) error
// close
Expand All @@ -63,11 +53,6 @@ type ExchangeClient struct {
init bool
}

// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}

// create ExchangeClient
func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
exchangeClient := &ExchangeClient{
Expand All @@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat
}
}

client.SetResponseHandler(exchangeClient)
return exchangeClient
}

Expand Down Expand Up @@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() {
func (client *ExchangeClient) IsAvailable() bool {
return client.client.IsAvailable()
}

// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {

pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}

pendingResponse.response = response

if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}

// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}

// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}

// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
37 changes: 36 additions & 1 deletion remoting/getty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ type (
ServerConfig struct {
SSLEnabled bool

// heartbeat
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration

// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration

// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
Expand All @@ -76,9 +84,13 @@ type (
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`

// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration

// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration

// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
Expand Down Expand Up @@ -188,6 +200,12 @@ func (c *ClientConfig) CheckValidity() error {
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}

if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}

if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
Expand All @@ -199,6 +217,23 @@ func (c *ClientConfig) CheckValidity() error {
func (c *ServerConfig) CheckValidity() error {
var err error

if len(c.HeartbeatPeriod) == 0 {
c.heartbeatPeriod = 60 * time.Second
} else if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}

if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s",
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}

if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}

if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
Expand Down
25 changes: 6 additions & 19 deletions remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,12 @@ type Options struct {

// Client : some configuration for network communication.
type Client struct {
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
responseHandler remoting.ResponseHandler
ExchangeClient *remoting.ExchangeClient
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
ExchangeClient *remoting.ExchangeClient
}

// create client
Expand All @@ -146,9 +145,6 @@ func NewClient(opt Options) *Client {
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client
}
func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}

// init client and try to connection.
func (c *Client) Connect(url *common.URL) error {
Expand Down Expand Up @@ -220,15 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
return rpcClient, rpcClient.selectSession(), nil
}

func (c *Client) heartbeat(session getty.Session) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
return c.transfer(session, req, 3*time.Second)
}

func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout)
return perrors.WithStack(err)
Expand Down
1 change: 0 additions & 1 deletion remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func getClient(url *common.URL) *Client {
exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false)
client.SetExchangeClient(exchangeClient)
client.Connect(url)
client.SetResponseHandler(exchangeClient)
return client
}

Expand Down
4 changes: 2 additions & 2 deletions remoting/getty/getty_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
Expand Down Expand Up @@ -195,7 +195,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)
Expand Down
Loading