Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fmt: code clean #763

Merged
merged 3 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type RemoteConfig struct {
TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"address,omitempty"`
Params map[string]string `yaml:"params" json:"params,omitempty"`
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
}

// Timeout return timeout duration.
Expand Down
14 changes: 6 additions & 8 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ var (
)

func init() {

// load clientconfig from consumer_config
// default use dubbo
consumerConfig := config.GetConsumerConfig()
Expand Down Expand Up @@ -106,8 +105,11 @@ func GetClientConf() ClientConfig {

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
clientGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber),
)
}
}

Expand Down Expand Up @@ -141,7 +143,6 @@ type Client struct {

// NewClient create a new Client.
func NewClient(opt Options) *Client {

switch {
case opt.ConnectTimeout == 0:
opt.ConnectTimeout = 3 * time.Second
Expand Down Expand Up @@ -203,7 +204,6 @@ func NewResponse(reply interface{}, atta map[string]interface{}) *Response {

// CallOneway call by one way
func (c *Client) CallOneway(request *Request) error {

return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
}

Expand Down Expand Up @@ -311,9 +311,7 @@ func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, NewPendingResponse())
}

func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
rsp *PendingResponse) error {

func (c *Client) transfer(session getty.Session, pkg *DubboPackage, rsp *PendingResponse) error {
var (
sequence uint64
err error
Expand Down
71 changes: 33 additions & 38 deletions protocol/dubbo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
)

Expand All @@ -42,7 +43,6 @@ var (
)

func init() {

// load clientconfig from provider_config
// default use dubbo
providerConfig := config.GetProviderConfig()
Expand Down Expand Up @@ -94,8 +94,11 @@ func GetServerConfig() ServerConfig {

func setServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
}
}

Expand All @@ -108,51 +111,47 @@ type Server struct {

// NewServer create a new Server.
func NewServer() *Server {

s := &Server{
conf: *srvConf,
return &Server{
conf: *srvConf,
rpcHandler: NewRpcServerHandler(srvConf.SessionNumber, srvConf.sessionTimeout),
}

s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout)

return s
}

func (s *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
err error
)
conf := s.conf

if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}

tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
if _, ok = session.Conn().(*tls.Conn); !ok {
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
return perrors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}

if err = tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive); err != nil {
return err
}
if conf.GettySessionParam.TcpKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize); err != nil {
return err
}
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)

session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
Expand All @@ -163,10 +162,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("app accepts new session:%s\n", session.Stat())

logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)

return nil
}

Expand All @@ -184,7 +181,6 @@ func (s *Server) Start(url common.URL) {
getty.WithServerSslEnabled(url.GetParamBool(constant.SSL_ENABLED_KEY, false)),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
)

} else {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
Expand All @@ -193,7 +189,6 @@ func (s *Server) Start(url common.URL) {
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", addr)
s.tcpServer = tcpServer

}

// Stop stop dubbo server.
Expand Down