From 2c32d6eec9d779926368aeb03391653f42041e77 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Tue, 3 Dec 2024 17:21:21 +0800 Subject: [PATCH] =?UTF-8?q?RPC=E4=B8=8E=E6=97=A5=E5=BF=97=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- log/log.go | 24 ++++++++++++------------ node/node.go | 11 +++++++---- rpc/client.go | 11 ++++------- rpc/lclient.go | 7 ++----- rpc/lserver.go | 2 +- rpc/natsclient.go | 4 ++-- rpc/rclient.go | 4 ++-- rpc/rpchandler.go | 3 +-- rpc/rpcnats.go | 3 --- rpc/server.go | 2 +- 10 files changed, 32 insertions(+), 39 deletions(-) diff --git a/log/log.go b/log/log.go index 861518e..808eba6 100644 --- a/log/log.go +++ b/log/log.go @@ -164,30 +164,30 @@ func Fatal(msg string, fields ...zap.Field) { gLogger.stack = false } -func Debugf(msg string, args ...any) { - gLogger.sugaredLogger.Debugf(msg, args...) +func Debugf(template string, args ...any) { + gLogger.sugaredLogger.Debugf(template, args...) } -func Infof(msg string, args ...any) { - gLogger.sugaredLogger.Infof(msg, args...) +func Infof(template string, args ...any) { + gLogger.sugaredLogger.Infof(template, args...) } -func Warnf(msg string, args ...any) { - gLogger.sugaredLogger.Warnf(msg, args...) +func Warnf(template string, args ...any) { + gLogger.sugaredLogger.Warnf(template, args...) } -func Errorf(msg string, args ...any) { - gLogger.sugaredLogger.Errorf(msg, args...) +func Errorf(template string, args ...any) { + gLogger.sugaredLogger.Errorf(template, args...) } -func StackErrorf(msg string, args ...any) { +func StackErrorf(template string, args ...any) { gLogger.stack = true - gLogger.sugaredLogger.Errorf(msg, args...) + gLogger.sugaredLogger.Errorf(template, args...) gLogger.stack = false } -func Fatalf(msg string, args ...any) { - gLogger.sugaredLogger.Fatalf(msg, args...) +func Fatalf(template string, args ...any) { + gLogger.sugaredLogger.Fatalf(template, args...) } func (logger *Logger) SDebug(args ...interface{}) { diff --git a/node/node.go b/node/node.go index 3486572..18c886c 100644 --- a/node/node.go +++ b/node/node.go @@ -330,13 +330,13 @@ func startNode(args interface{}) error { myName, mErr := sysprocess.GetMyProcessName() //当前进程名获取失败,不应该发生 if mErr != nil { - log.Info("get my process's name is error", log.ErrorField("err", mErr)) + log.Error("get my process's name is error", log.ErrorField("err", mErr)) os.Exit(-1) } //进程id存在,而且进程名也相同,被认为是当前进程重复运行 if cErr == nil && name == myName { - log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId)) + log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId)) os.Exit(-1) } break @@ -354,10 +354,13 @@ func startNode(args interface{}) error { service.Start() //5.运行集群 - cluster.GetCluster().Start() + err := cluster.GetCluster().Start() + if err != nil { + log.Error(err.Error()) + os.Exit(-1) + } //6.监听程序退出信号&性能报告 - var pProfilerTicker *time.Ticker = &time.Ticker{} if profilerInterval > 0 { pProfilerTicker = time.NewTicker(profilerInterval) diff --git a/rpc/client.go b/rpc/client.go index 8902fcd..9a56eb3 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -32,7 +32,7 @@ type IRealClient interface { SetConn(conn *network.NetConn) Close(waitDone bool) - AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) + AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call IsConnected() bool @@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc return call } -func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { +func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) { processorType, processor := GetProcessorType(args) InParam, herr := processor.Marshal(args) if herr != nil { @@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, return emptyCancelRpc, err } - if cancelable { - rpcCancel := RpcCancel{CallSeq: seq, Cli: client} - return rpcCancel.CancelRpc, nil - } - return emptyCancelRpc, nil + rpcCancel := RpcCancel{CallSeq: seq, Cli: client} + return rpcCancel.CancelRpc, nil } diff --git a/rpc/lclient.go b/rpc/lclient.go index 2cce955..c2bf32f 100644 --- a/rpc/lclient.go +++ b/rpc/lclient.go @@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) } -func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) { +func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) { pLocalRpcServer := rpcHandler.GetRpcServer()() //判断是否是同一服务 @@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR } //其他的rpcHandler的处理器 - cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable) + cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback) if err != nil { callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) } @@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client { client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) client.targetNodeId = localNodeId - //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount - //client.callRpcTimeout = DefaultRpcTimeout - lClient := &LClient{} lClient.selfClient = client client.IRealClient = lClient diff --git a/rpc/lserver.go b/rpc/lserver.go index 6fe957d..9ff1a47 100644 --- a/rpc/lserver.go +++ b/rpc/lserver.go @@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor return pCall } -func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) { +func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) { rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler == nil { err := errors.New("service method " + serviceMethod + " not config!") diff --git a/rpc/natsclient.go b/rpc/natsclient.go index 1c5fe8a..9059955 100644 --- a/rpc/natsclient.go +++ b/rpc/natsclient.go @@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) } -func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { - cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable) +func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) { + cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam) if err != nil { callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) } diff --git a/rpc/rclient.go b/rpc/rclient.go index 3f600f4..dc3139a 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) } -func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { - cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable) +func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) { + cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam) if err != nil { callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index ba30c22..2ba8d25 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -526,8 +526,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se } //2.rpcClient调用 - //如果调用本结点服务 - return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false) + return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, ) } func (handler *RpcHandler) GetName() string { diff --git a/rpc/rpcnats.go b/rpc/rpcnats.go index e4b9e03..66d5db9 100644 --- a/rpc/rpcnats.go +++ b/rpc/rpcnats.go @@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet client.clientId = atomic.AddUint32(&clientSeq, 1) client.targetNodeId = targetNodeId - //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount - //client.callRpcTimeout = DefaultRpcTimeout - natsClient := &rn.NatsClient natsClient.localNodeId = localNodeId natsClient.client = &client diff --git a/rpc/server.go b/rpc/server.go index ebceea6..7057b45 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -31,7 +31,7 @@ type IServer interface { selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error - selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) + selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) } type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)