Skip to content

Commit

Permalink
Merge pull request #312 from YanHeDoki/master
Browse files Browse the repository at this point in the history
RequestPool 模式添加开关,补充了一个example
  • Loading branch information
aceld committed Apr 9, 2024
2 parents 404d232 + 077b4f2 commit aff32a8
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 9 deletions.
38 changes: 38 additions & 0 deletions examples/zinx_RequestPollMode/NoPoolModeServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"time"

"github.com/aceld/zinx/zconf"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
)

// 如果不使用对象池模式则可以直接传递但是产生大量的 Request 对象

func NoPoll1(request ziface.IRequest) {
request.Set("num", 1)
go NoPoll2(request)
}

func NoPoll2(request ziface.IRequest) {
time.Sleep(time.Second * 3)
get, _ := request.Get("num")
fmt.Printf("num:%v \n", get)

}

func NoPoll4(request ziface.IRequest) {
// 非对象池模式永远不会影响原本的 Request
request.Set("num", 3)
}

func main() {

// 开启 Request 对象池模式
server := znet.NewUserConfServer(&zconf.Config{RouterSlicesMode: true, TCPPort: 8999, Host: "127.0.0.1", RequestPoolMode: false})
server.AddRouterSlices(1, NoPoll1)
server.AddRouterSlices(2, NoPoll4)
server.Serve()
}
66 changes: 66 additions & 0 deletions examples/zinx_RequestPollMode/PoolModeServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"fmt"
"time"

"github.com/aceld/zinx/zconf"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
)

func Poll1(request ziface.IRequest) {
// 如果需要连接信息
request.Set("conn", request.GetConnection())
request.Set("num", 1)
fmt.Printf("request 1 addr:%p,conn:%p \n", &request, request.GetConnection())

// 需要新线程同时也需要上下文的情况,则需要调用 copy 方法拷贝一份
cp := request.Copy()
go Poll2(cp)

// 如果不使用 copy 方法拷贝对象则会出现同一个对象但是信息可能不一致的问题,不启动 poll2 会更直接
go Poll3(request)
}

func Poll2(request ziface.IRequest) {
defer func() {
if err := recover(); err != nil {
// 接收一个panic
fmt.Println(err)
}

}()
get_conn, ok := request.Get("conn")
if ok {
// 如果直接取用则会导致空指针
request.GetConnection().GetConnID()
// 打印出的 Request 对象的地址是不一致的
conn := get_conn.(ziface.IConnection)
fmt.Printf("request copy addr:%p,conn:%p \n", &request, conn)
// conn.sendMsg()
}
}

// 如果请求的次数多,则开启对象池且直接传递不copy Request 就可能导致值不一致
func Poll3(request ziface.IRequest) {
time.Sleep(time.Second * 3)
get, _ := request.Get("num")
// 池化对象如果直接传递被影响可能随机打印被修改的值 3
fmt.Printf("num:%v \n", get)

}

func Poll4(request ziface.IRequest) {
// 影响原本的 request 对象
request.Set("num", 3)
}

func main() {

// 开启 Request 对象池模式
server := znet.NewUserConfServer(&zconf.Config{RouterSlicesMode: true, TCPPort: 8999, Host: "127.0.0.1", RequestPoolMode: true})
server.AddRouterSlices(1, Poll1)
server.AddRouterSlices(2, Poll4)
server.Serve()
}
86 changes: 86 additions & 0 deletions examples/zinx_RequestPollMode/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"github.com/aceld/zinx/examples/zinx_client/c_router"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

// Custom business logic of the client (客户端自定义业务)
func business(conn ziface.IConnection) {

for i := 0; i < 100; i++ {
err := conn.SendMsg(1, []byte("Ping...[FromClient]"))
if err != nil {
fmt.Println(err)
zlog.Error(err)
break
}

err = conn.SendMsg(2, []byte("Ping...[FromClient]"))
if err != nil {
fmt.Println(err)
zlog.Error(err)
break
}

}
}

// Function to execute when the connection is created (创建连接的时候执行)
func DoClientConnectedBegin(conn ziface.IConnection) {
zlog.Debug("DoConnecionBegin is Called ... ")

// Set two connection properties after the connection is created (设置两个链接属性,在连接创建之后)
conn.SetProperty("Name", "刘丹冰Aceld")
conn.SetProperty("Home", "https://yuque.com/aceld")

go business(conn)
}

// Function to execute when the connection is lost (连接断开的时候执行)
func DoClientConnectedLost(conn ziface.IConnection) {
// Get the Name and Home properties of the connection before it is destroyed
// (在连接销毁之前,查询conn的Name,Home属性)
if name, err := conn.GetProperty("Name"); err == nil {
zlog.Debug("Conn Property Name = ", name)
}

if home, err := conn.GetProperty("Home"); err == nil {
zlog.Debug("Conn Property Home = ", home)
}

zlog.Debug("DoClientConnectedLost is Called ... ")
}

func main() {
// Create a client handle using Zinx's Method (创建一个Client句柄,使用Zinx的方法)
client := znet.NewClient("127.0.0.1", 8999)

// Set the business logic to execute when the connection is created or lost
// (添加首次建立链接时的业务)
client.SetOnConnStart(DoClientConnectedBegin)
client.SetOnConnStop(DoClientConnectedLost)

// Register routers for the messages received from the server
// (注册收到服务器消息业务路由)
client.AddRouter(2, &c_router.PingRouter{})
client.AddRouter(3, &c_router.HelloRouter{})

// Start the client
client.Start()

// close
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
sig := <-c
fmt.Println("===exit===", sig)
client.Stop()
time.Sleep(time.Second * 2)
}
3 changes: 3 additions & 0 deletions zconf/globalobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Config struct {
// 路由模式 false为旧版本路由,true为启用新版本的路由 默认使用旧版本
RouterSlicesMode bool

// 是否开启 Request 对象池模式
RequestPoolMode bool
/*
logger
*/
Expand Down Expand Up @@ -226,6 +228,7 @@ func init() {
PrivateKeyFile: "",
Mode: ServerModeTcp,
RouterSlicesMode: false,
RequestPoolMode: false,
KcpACKNoDelay: false,
KcpStreamMode: true,
//Normal Mode: ikcp_nodelay(kcp, 0, 40, 0, 0);
Expand Down
4 changes: 4 additions & 0 deletions zconf/userconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func UserConfToGlobal(config *Config) {
GlobalObject.RouterSlicesMode = config.RouterSlicesMode
}

if config.RequestPoolMode {
GlobalObject.RequestPoolMode = config.RequestPoolMode
}

if config.KcpPort != 0 {
GlobalObject.KcpPort = config.KcpPort
}
Expand Down
32 changes: 24 additions & 8 deletions znet/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,23 @@ func NewRequest(conn ziface.IConnection, msg ziface.IMessage) ziface.IRequest {
}

func GetRequest(conn ziface.IConnection, msg ziface.IMessage) ziface.IRequest {
// 从对象池中取得一个 Request 对象,如果池子中没有可用的 Request 对象则会调用 allocateRequest 函数构造一个新的对象分配
r := RequestPool.Get().(*Request)
// 因为取出的 Request 对象可能是已存在也可能是新构造的,无论是哪种情况都应该初始化再返回使用
r.Reset(conn, msg)
return r

// 根据当前模式判断是否使用对象池
if zconf.GlobalObject.RequestPoolMode {
// 从对象池中取得一个 Request 对象,如果池子中没有可用的 Request 对象则会调用 allocateRequest 函数构造一个新的对象分配
r := RequestPool.Get().(*Request)
// 因为取出的 Request 对象可能是已存在也可能是新构造的,无论是哪种情况都应该初始化再返回使用
r.Reset(conn, msg)
return r
}
return NewRequest(conn, msg)
}

func PutRequest(request ziface.IRequest) {
RequestPool.Put(request)
// 判断是否开启了对象池模式
if zconf.GlobalObject.RequestPoolMode {
RequestPool.Put(request)
}
}

func allocateRequest() ziface.IRequest {
Expand All @@ -90,9 +98,10 @@ func (r *Request) Reset(conn ziface.IConnection, msg ziface.IMessage) {
}

// Copy 在执行路由函数的时候可能会出现需要再起一个协程的需求,但是 Request 对象由对象池管理后无法保证新协程中的 Request 参数一致
// 通过 Copy 方法复制一份 Request 对象保持创建协程时候的参数一致。但新开的协程不应该在对原始的执行过程有影响,所以不包含链接和路由对象。
// 通过 Copy 方法复制一份 Request 对象保持创建协程时候的参数一致。但新开的协程不应该在对原始的执行过程有影响,所以不包含连接和路由对象。
// 但如果一定对连接信息有所需要可以在 Copy 后手动 set 一份参数在 Request 对象中
func (r *Request) Copy() ziface.IRequest {
// 构造一个新的 Request 对象,复制部分原始对象的参数,但是复制的 Request 不应该再对原始链接操作,所以不能含有链接参数
// 构造一个新的 Request 对象,复制部分原始对象的参数,但是复制的 Request 不应该再对原始连接操作,所以不含有连接参数
// 同理也不应该再执行路由方法,路由函数也不包含
newRequest := &Request{
conn: nil,
Expand All @@ -110,6 +119,13 @@ func (r *Request) Copy() ziface.IRequest {
newRequest.keys[k] = v
}

// 复制一份原本的 icResp
copyResp := []ziface.IcResp{r.icResp}
newIcResp := make([]ziface.IcResp, 0, 1)
copy(newIcResp, copyResp)
for _, v := range newIcResp {
newRequest.icResp = v
}
// 复制一份原本的 msg 信息
newRequest.msg = zpack.NewMessageByMsgId(r.msg.GetMsgID(), r.msg.GetDataLen(), r.msg.GetRawData())

Expand Down
4 changes: 3 additions & 1 deletion znet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type Server struct {

// Routing mode (路由模式)
RouterSlicesMode bool

// Request 对象池模式
RequestPoolMode bool
// Current server's connection manager (当前Server的链接管理器)
ConnMgr ziface.IConnManager

Expand Down Expand Up @@ -127,6 +128,7 @@ func newServerWithConfig(config *zconf.Config, ipVersion string, opts ...Option)
KcpPort: config.KcpPort,
msgHandler: newMsgHandle(),
RouterSlicesMode: config.RouterSlicesMode,
RequestPoolMode: config.RequestPoolMode,
ConnMgr: newConnManager(),
exitChan: nil,
// Default to using Zinx's TLV data pack format
Expand Down

0 comments on commit aff32a8

Please sign in to comment.