Skip to content

Commit

Permalink
补充与优化rpc超时功能RAEDME说明
Browse files Browse the repository at this point in the history
  • Loading branch information
duanhf2012 committed Aug 1, 2023
1 parent dfb6959 commit 0f3a965
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 8 deletions.
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ cluster.json如下:
"Private": false,
"ListenAddr":"127.0.0.1:8001",
"MaxRpcParamLen": 409600,
"CompressBytesLen": 20480,
"NodeName": "Node_Test1",
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
Expand All @@ -72,7 +73,8 @@ cluster.json如下:
"NodeId": 2,
"Private": false,
"ListenAddr":"127.0.0.1:8002",
"MaxRpcParamLen": 409600,
"MaxRpcParamLen": 409600,
"CompressBytesLen": 20480,
"NodeName": "Node_Test1",
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
Expand All @@ -88,6 +90,7 @@ cluster.json如下:
* Private: 是否私有结点,如果为true,表示其他结点不会发现它,但可以自我运行。
* ListenAddr:Rpc通信服务的监听地址
* MaxRpcParamLen:Rpc参数数据包最大长度,该参数可以缺省,默认一次Rpc调用支持最大4294967295byte长度数据。
* CompressBytesLen:Rpc网络数据压缩,当数据>=20480byte时将被压缩。该参数可以缺省或者填0时不进行压缩。
* NodeName:结点名称
* remark:备注,可选项
* ServiceList:该Node拥有的服务列表,注意:origin按配置的顺序进行安装初始化。但停止服务的顺序是相反。
Expand Down Expand Up @@ -715,6 +718,15 @@ func (slf *TestService7) CallTest(){
}else{
fmt.Printf("Call output %d\n",output)
}
//自定义超时,默认rpc超时时间为15s
err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output)
if err != nil {
fmt.Printf("Call error :%+v\n", err)
} else {
fmt.Printf("Call output %d\n", output)
}
}
Expand All @@ -726,13 +738,27 @@ func (slf *TestService7) AsyncCallTest(){
})*/
//异步调用,在数据返回时,会回调传入函数
//注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值
slf.AsyncCall("TestService6.RPC_Sum",&input,func(output *int,err error){
err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) {
if err != nil {
fmt.Printf("AsyncCall error :%+v\n",err)
}else{
fmt.Printf("AsyncCall output %d\n",*output)
fmt.Printf("AsyncCall error :%+v\n", err)
} else {
fmt.Printf("AsyncCall output %d\n", *output)
}
})
fmt.Println(err)
//自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用
rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) {
//如果下面注释的rpcCancel()函数被调用,这里可能将不再返回
if err != nil {
fmt.Printf("AsyncCall error :%+v\n", err)
} else {
fmt.Printf("AsyncCall output %d\n", *output)
}
})
//rpcCancel()
fmt.Println(err, rpcCancel)
}
func (slf *TestService7) GoTest(){
Expand Down
3 changes: 2 additions & 1 deletion rpc/rpchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
pCall.callback = &callBack
pCall.Seq = client.generateSeq()
callSeq = pCall.Seq

pCall.TimeOut = DefaultRpcTimeout
pCall.ServiceMethod = ServiceMethod
client.AddPending(pCall)

//有返回值时
Expand Down
3 changes: 1 addition & 2 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.TimeOut = timeout
pCall.ServiceMethod = serviceMethod

rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
Expand Down Expand Up @@ -420,8 +421,6 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq)
if v == nil {
log.SError("rpcClient cannot find seq ", callSeq, " in pending, service method is ",serviceMethod)
//ReleaseCall(pCall)
ReleaseRpcRequest(req)
return
}
Expand Down

0 comments on commit 0f3a965

Please sign in to comment.