From 0f3a965d732f6cae4c6dd345017df1718ee2d542 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Tue, 1 Aug 2023 11:05:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E4=B8=8E=E4=BC=98=E5=8C=96rp?= =?UTF-8?q?c=E8=B6=85=E6=97=B6=E5=8A=9F=E8=83=BDRAEDME=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 36 +++++++++++++++++++++++++++++++----- rpc/rpchandler.go | 3 ++- rpc/server.go | 3 +-- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 1a889e9..16a8912 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ cluster.json如下: "Private": false, "ListenAddr":"127.0.0.1:8001", "MaxRpcParamLen": 409600, + "CompressBytesLen": 20480, "NodeName": "Node_Test1", "remark":"//以_打头的,表示只在本机进程,不对整个子网公开", "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"] @@ -72,7 +73,8 @@ cluster.json如下: "NodeId": 2, "Private": false, "ListenAddr":"127.0.0.1:8002", - "MaxRpcParamLen": 409600, + "MaxRpcParamLen": 409600, + "CompressBytesLen": 20480, "NodeName": "Node_Test1", "remark":"//以_打头的,表示只在本机进程,不对整个子网公开", "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"] @@ -88,6 +90,7 @@ cluster.json如下: * Private: 是否私有结点,如果为true,表示其他结点不会发现它,但可以自我运行。 * ListenAddr:Rpc通信服务的监听地址 * MaxRpcParamLen:Rpc参数数据包最大长度,该参数可以缺省,默认一次Rpc调用支持最大4294967295byte长度数据。 +* CompressBytesLen:Rpc网络数据压缩,当数据>=20480byte时将被压缩。该参数可以缺省或者填0时不进行压缩。 * NodeName:结点名称 * remark:备注,可选项 * ServiceList:该Node拥有的服务列表,注意:origin按配置的顺序进行安装初始化。但停止服务的顺序是相反。 @@ -715,6 +718,15 @@ func (slf *TestService7) CallTest(){ }else{ fmt.Printf("Call output %d\n",output) } + + + //自定义超时,默认rpc超时时间为15s + err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output) + if err != nil { + fmt.Printf("Call error :%+v\n", err) + } else { + fmt.Printf("Call output %d\n", output) + } } @@ -726,13 +738,27 @@ func (slf *TestService7) AsyncCallTest(){ })*/ //异步调用,在数据返回时,会回调传入函数 //注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值 - slf.AsyncCall("TestService6.RPC_Sum",&input,func(output *int,err error){ + err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) { if err != nil { - fmt.Printf("AsyncCall error :%+v\n",err) - }else{ - fmt.Printf("AsyncCall output %d\n",*output) + fmt.Printf("AsyncCall error :%+v\n", err) + } else { + fmt.Printf("AsyncCall output %d\n", *output) } }) + fmt.Println(err) + + //自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用 + rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) { + //如果下面注释的rpcCancel()函数被调用,这里可能将不再返回 + if err != nil { + fmt.Printf("AsyncCall error :%+v\n", err) + } else { + fmt.Printf("AsyncCall output %d\n", *output) + } + }) + //rpcCancel() + fmt.Println(err, rpcCancel) + } func (slf *TestService7) GoTest(){ diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index b02634a..d2622fc 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -334,7 +334,8 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param pCall.callback = &callBack pCall.Seq = client.generateSeq() callSeq = pCall.Seq - + pCall.TimeOut = DefaultRpcTimeout + pCall.ServiceMethod = ServiceMethod client.AddPending(pCall) //有返回值时 diff --git a/rpc/server.go b/rpc/server.go index a4d6202..f2580df 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -292,6 +292,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP pCall := MakeCall() pCall.Seq = client.generateSeq() pCall.TimeOut = timeout + pCall.ServiceMethod = serviceMethod rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler == nil { @@ -420,8 +421,6 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl req.requestHandle = func(Returns interface{}, Err RpcError) { v := client.RemovePending(callSeq) if v == nil { - log.SError("rpcClient cannot find seq ", callSeq, " in pending, service method is ",serviceMethod) - //ReleaseCall(pCall) ReleaseRpcRequest(req) return }