forked from alberliu/gn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
216 lines (190 loc) · 4.58 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package gn
import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"syscall"
)
var (
ErrReadTimeout = errors.New("tcp read timeout")
)
// Handler Server 注册接口
type Handler interface {
OnConnect(c *Conn) // OnConnect 当TCP长连接建立成功是回调
OnMessage(c *Conn, bytes []byte) // OnMessage 当客户端有数据写入是回调
OnClose(c *Conn, err error) // OnClose 当客户端主动断开链接或者超时时回调,err返回关闭的原因
}
const (
EventIn = 1 // 数据流入
EventClose = 2 // 断开连接
EventTimeout = 3 // 检测到超时
)
type event struct {
FD int32 // 文件描述符
Type int32 // 时间类型
}
// Server TCP服务
type Server struct {
netpoll netpoll // 具体操作系统网络实现
options *options // 服务参数
readBufferPool *sync.Pool // 读缓存区内存池
handler Handler // 注册的处理
ioEventQueues []chan event // IO事件队列集合
ioQueueNum int32 // IO事件队列集合数量
conns sync.Map // TCP长连接管理
connsNum int64 // 当前建立的长连接数量
stop chan int // 服务器关闭信号
}
// NewServer 创建server服务器
func NewServer(address string, handler Handler, opts ...Option) (*Server, error) {
options := getOptions(opts...)
// 初始化读缓存区内存池
readBufferPool := &sync.Pool{
New: func() interface{} {
b := make([]byte, options.readBufferLen)
return b
},
}
// 初始化epoll网络
netpoll, err := newNetpoll(address)
if err != nil {
log.Error(err)
return nil, err
}
// 初始化io事件队列
ioEventQueues := make([]chan event, options.ioGNum)
for i := range ioEventQueues {
ioEventQueues[i] = make(chan event, options.ioEventQueueLen)
}
return &Server{
netpoll: netpoll,
options: options,
readBufferPool: readBufferPool,
handler: handler,
ioEventQueues: ioEventQueues,
ioQueueNum: int32(options.ioGNum),
conns: sync.Map{},
connsNum: 0,
stop: make(chan int),
}, nil
}
// GetConn 获取Conn
func (s *Server) GetConn(fd int32) (*Conn, bool) {
value, ok := s.conns.Load(fd)
if !ok {
return nil, false
}
return value.(*Conn), true
}
// Run 启动服务
func (s *Server) Run() {
log.Info("gn server run")
s.startAccept()
s.startIOConsumer()
s.startIOProducer()
}
// GetConnsNum 获取当前长连接的数量
func (s *Server) GetConnsNum() int64 {
return atomic.LoadInt64(&s.connsNum)
}
// Stop 启动服务
func (s *Server) Stop() {
close(s.stop)
for _, queue := range s.ioEventQueues {
close(queue)
}
}
// handleEvent 处理事件
func (s *Server) handleEvent(event event) {
index := event.FD % s.ioQueueNum
s.ioEventQueues[index] <- event
}
// StartProducer 启动生产者
func (s *Server) startIOProducer() {
log.Info("start io producer")
for {
select {
case <-s.stop:
log.Error("stop producer")
return
default:
events, err := s.netpoll.getEvents()
if err != nil {
log.Error(err)
}
for i := range events {
s.handleEvent(events[i])
}
}
}
}
// startAccept 开始接收连接请求
func (s *Server) startAccept() {
for i := 0; i < s.options.acceptGNum; i++ {
go s.accept()
}
log.Info(fmt.Sprintf("start accept by %d goroutine", s.options.acceptGNum))
}
// accept 接收连接请求
func (s *Server) accept() {
for {
select {
case <-s.stop:
return
default:
nfd, addr, err := s.netpoll.accept()
if err != nil {
log.Error(err)
continue
}
fd := int32(nfd)
conn := newConn(fd, addr, s)
s.conns.Store(fd, conn)
atomic.AddInt64(&s.connsNum, 1)
s.handler.OnConnect(conn)
}
}
}
// StartConsumer 启动消费者
func (s *Server) startIOConsumer() {
for _, queue := range s.ioEventQueues {
go s.consumeIOEvent(queue)
}
log.Info(fmt.Sprintf("start io event consumer by %d goroutine", len(s.ioEventQueues)))
}
// ConsumeIO 消费IO事件
func (s *Server) consumeIOEvent(queue chan event) {
for event := range queue {
v, ok := s.conns.Load(event.FD)
if !ok {
log.Error("not found in conns,", event.FD)
continue
}
c := v.(*Conn)
if event.Type == EventClose {
c.Close()
s.handler.OnClose(c, io.EOF)
continue
}
if event.Type == EventTimeout {
c.Close()
s.handler.OnClose(c, ErrReadTimeout)
continue
}
err := c.read()
if err != nil {
// 服务端关闭连接
if err == syscall.EBADF {
continue
}
c.Close()
s.handler.OnClose(c, err)
log.Debug(err)
}
}
}
func (s *Server) handleTimeoutEvent(fd int32) {
s.handleEvent(event{FD: fd, Type: EventTimeout})
}