Skip to content

Commit

Permalink
Merge pull request #763 from dubbo-x/read
Browse files Browse the repository at this point in the history
Fmt: code clean
  • Loading branch information
fangyincheng committed Sep 19, 2020
2 parents ea9d6eb + 61fbc46 commit 163d645
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 47 deletions.
2 changes: 1 addition & 1 deletion config/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type RemoteConfig struct {
TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"address,omitempty"`
Params map[string]string `yaml:"params" json:"params,omitempty"`
}

// Timeout return timeout duration.
Expand Down
14 changes: 6 additions & 8 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ var (
)

func init() {

// load clientconfig from consumer_config
// default use dubbo
consumerConfig := config.GetConsumerConfig()
Expand Down Expand Up @@ -106,8 +105,11 @@ func GetClientConf() ClientConfig {

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
clientGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber),
)
}
}

Expand Down Expand Up @@ -141,7 +143,6 @@ type Client struct {

// NewClient create a new Client.
func NewClient(opt Options) *Client {

switch {
case opt.ConnectTimeout == 0:
opt.ConnectTimeout = 3 * time.Second
Expand Down Expand Up @@ -203,7 +204,6 @@ func NewResponse(reply interface{}, atta map[string]interface{}) *Response {

// CallOneway call by one way
func (c *Client) CallOneway(request *Request) error {

return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
}

Expand Down Expand Up @@ -311,9 +311,7 @@ func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, NewPendingResponse())
}

func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
rsp *PendingResponse) error {

func (c *Client) transfer(session getty.Session, pkg *DubboPackage, rsp *PendingResponse) error {
var (
sequence uint64
err error
Expand Down
71 changes: 33 additions & 38 deletions protocol/dubbo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)

Expand All @@ -42,7 +43,6 @@ var (
)

func init() {

// load clientconfig from provider_config
// default use dubbo
providerConfig := config.GetProviderConfig()
Expand Down Expand Up @@ -94,8 +94,11 @@ func GetServerConfig() ServerConfig {

func setServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
}
}

Expand All @@ -108,51 +111,47 @@ type Server struct {

// NewServer create a new Server.
func NewServer() *Server {

s := &Server{
conf: *srvConf,
return &Server{
conf: *srvConf,
rpcHandler: NewRpcServerHandler(srvConf.SessionNumber, srvConf.sessionTimeout),
}

s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout)

return s
}

func (s *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
err error
)
conf := s.conf

if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}

tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
if _, ok = session.Conn().(*tls.Conn); !ok {
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
return perrors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}

if err = tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive); err != nil {
return err
}
if conf.GettySessionParam.TcpKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize); err != nil {
return err
}
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)

session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
Expand All @@ -163,10 +162,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("app accepts new session:%s\n", session.Stat())

logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)

return nil
}

Expand All @@ -184,7 +181,6 @@ func (s *Server) Start(url common.URL) {
getty.WithServerSslEnabled(url.GetParamBool(constant.SSL_ENABLED_KEY, false)),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
)

} else {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
Expand All @@ -193,7 +189,6 @@ func (s *Server) Start(url common.URL) {
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", addr)
s.tcpServer = tcpServer

}

// Stop stop dubbo server.
Expand Down

0 comments on commit 163d645

Please sign in to comment.