diff --git a/config/remote_config.go b/config/remote_config.go index 55380dd5a0..0f0c3e5cb7 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -40,7 +40,7 @@ type RemoteConfig struct { TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"` Username string `yaml:"username" json:"username,omitempty" property:"username"` Password string `yaml:"password" json:"password,omitempty" property:"password"` - Params map[string]string `yaml:"params" json:"address,omitempty"` + Params map[string]string `yaml:"params" json:"params,omitempty"` } // Timeout return timeout duration. diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index b6e4618bb5..8c71c57939 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -52,7 +52,6 @@ var ( ) func init() { - // load clientconfig from consumer_config // default use dubbo consumerConfig := config.GetConsumerConfig() @@ -106,8 +105,11 @@ func GetClientConf() ClientConfig { func setClientGrpool() { if clientConf.GrPoolSize > 1 { - clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), - gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) + clientGrpool = gxsync.NewTaskPool( + gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), + gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), + gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber), + ) } } @@ -141,7 +143,6 @@ type Client struct { // NewClient create a new Client. func NewClient(opt Options) *Client { - switch { case opt.ConnectTimeout == 0: opt.ConnectTimeout = 3 * time.Second @@ -203,7 +204,6 @@ func NewResponse(reply interface{}, atta map[string]interface{}) *Response { // CallOneway call by one way func (c *Client) CallOneway(request *Request) error { - return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) } @@ -311,9 +311,7 @@ func (c *Client) heartbeat(session getty.Session) error { return c.transfer(session, nil, NewPendingResponse()) } -func (c *Client) transfer(session getty.Session, pkg *DubboPackage, - rsp *PendingResponse) error { - +func (c *Client) transfer(session getty.Session, pkg *DubboPackage, rsp *PendingResponse) error { var ( sequence uint64 err error diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 4ad4796c54..b693b06f49 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -26,6 +26,7 @@ import ( import ( "github.com/apache/dubbo-getty" "github.com/dubbogo/gost/sync" + perrors "github.com/pkg/errors" "gopkg.in/yaml.v2" ) @@ -42,7 +43,6 @@ var ( ) func init() { - // load clientconfig from provider_config // default use dubbo providerConfig := config.GetProviderConfig() @@ -94,8 +94,11 @@ func GetServerConfig() ServerConfig { func setServerGrpool() { if srvConf.GrPoolSize > 1 { - srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), - gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber)) + srvGrpool = gxsync.NewTaskPool( + gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), + gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), + gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber), + ) } } @@ -108,51 +111,47 @@ type Server struct { // NewServer create a new Server. func NewServer() *Server { - - s := &Server{ - conf: *srvConf, + return &Server{ + conf: *srvConf, + rpcHandler: NewRpcServerHandler(srvConf.SessionNumber, srvConf.sessionTimeout), } - - s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout) - - return s } func (s *Server) newSession(session getty.Session) error { var ( ok bool tcpConn *net.TCPConn + err error ) conf := s.conf if conf.GettySessionParam.CompressEncoding { session.SetCompressType(getty.CompressZip) } - if _, ok = session.Conn().(*tls.Conn); ok { - session.SetName(conf.GettySessionParam.SessionName) - session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(rpcServerPkgHandler) - session.SetEventListener(s.rpcHandler) - session.SetWQLen(conf.GettySessionParam.PkgWQSize) - session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) - session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) - session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) - session.SetWaitTime(conf.GettySessionParam.waitTimeout) - logger.Debugf("server accepts new session:%s\n", session.Stat()) - session.SetTaskPool(srvGrpool) - return nil - } - if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { - panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) - } - tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) - tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) - if conf.GettySessionParam.TcpKeepAlive { - tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) + if _, ok = session.Conn().(*tls.Conn); !ok { + if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { + return perrors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn())) + } + + if err = tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay); err != nil { + return err + } + if err = tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive); err != nil { + return err + } + if conf.GettySessionParam.TcpKeepAlive { + if err = tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod); err != nil { + return err + } + } + if err = tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize); err != nil { + return err + } + if err = tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize); err != nil { + return err + } } - tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) - tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) @@ -163,10 +162,8 @@ func (s *Server) newSession(session getty.Session) error { session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetWaitTime(conf.GettySessionParam.waitTimeout) - logger.Debugf("app accepts new session:%s\n", session.Stat()) - + logger.Debugf("server accepts new session: %s", session.Stat()) session.SetTaskPool(srvGrpool) - return nil } @@ -184,7 +181,6 @@ func (s *Server) Start(url common.URL) { getty.WithServerSslEnabled(url.GetParamBool(constant.SSL_ENABLED_KEY, false)), getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()), ) - } else { tcpServer = getty.NewTCPServer( getty.WithLocalAddress(addr), @@ -193,7 +189,6 @@ func (s *Server) Start(url common.URL) { tcpServer.RunEventLoop(s.newSession) logger.Debugf("s bind addr{%s} ok!", addr) s.tcpServer = tcpServer - } // Stop stop dubbo server.