Skip to content

Commit

Permalink
# Conflicts:
Browse files Browse the repository at this point in the history
#	README.md
#	codec.go
#	conn.go
#	server.go
#	test/client/main.go
#	test/server/main.go
  • Loading branch information
alberliu committed May 11, 2022
1 parent f87cb5d commit 7649abd
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 150 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,32 @@ var log = gn.GetLogger()

var server *gn.Server

var encoder = gn.NewHeaderLenEncoder(2, 1024)
type Handler struct{}

type Handler struct {}

func (Handler) OnConnect(c *gn.Conn) {
func (*Handler) OnConnect(c *gn.Conn) {
log.Info("connect:", c.GetFd(), c.GetAddr())
}
func (Handler) OnMessage(c *gn.Conn, bytes []byte) {
encoder.EncodeToFD(c.GetFd(), bytes)
func (*Handler) OnMessage(c *gn.Conn, bytes []byte) {
c.WriteWithEncoder(bytes)
log.Info("read:", string(bytes))
}
func (Handler) OnClose(c *gn.Conn, err error) {
func (*Handler) OnClose(c *gn.Conn, err error) {
log.Info("close:", c.GetFd(), err)
}

func main() {
var err error
server, err = gn.NewServer(":8080", Handler{}, gn.NewHeaderLenDecoder(2),
gn.WithTimeout(5*time.Second), gn.WithReadBufferLen(10))
server, err = gn.NewServer(":8080", &Handler{},
gn.WithDecoder(gn.NewHeaderLenDecoder(2)),
gn.WithEncoder(gn.NewHeaderLenEncoder(2, 1024)),
gn.WithTimeout(5*time.Second),
gn.WithReadBufferLen(10))
if err != nil {
log.Info("err")
return
}

server.Run()
}

```
6 changes: 6 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (b *Buffer) Read(offset, limit int) ([]byte, error) {
return buf, nil
}

// ReadAll 读取所有字节
func (b *Buffer) ReadAll() []byte {
buf, _ := b.Read(b.start, b.end)
return buf
}

// reset 重新设置缓存区(将有用字节前移)
func (b *Buffer) reset() {
if b.start == 0 {
Expand Down
31 changes: 4 additions & 27 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ import (
"fmt"
"io"
"sync"
"syscall"
)

// Decoder 解码器
type Decoder interface {
Decode(c *Conn) error
Decode(*Buffer, func([]byte)) error
}

// Encoder 编码器
type Encoder interface {
EncodeToFD(fd int32, bytes []byte) error
EncodeToWriter(w io.Writer, bytes []byte) error
}

type headerLenDecoder struct {
Expand All @@ -37,8 +36,7 @@ func NewHeaderLenDecoder(headerLen int) Decoder {
}

// Decode 解码
func (d *headerLenDecoder) Decode(c *Conn) error {
buffer := c.GetBuffer()
func (d *headerLenDecoder) Decode(buffer *Buffer, handle func([]byte)) error {
for {
header, err := buffer.Seek(d.headerLen)
if err == ErrNotEnough {
Expand All @@ -55,7 +53,7 @@ func (d *headerLenDecoder) Decode(c *Conn) error {
return nil
}

c.OnMessage(body)
handle(body)
}
}

Expand Down Expand Up @@ -85,27 +83,6 @@ func NewHeaderLenEncoder(headerLen, writeBufferLen int) *headerLenEncoder {
}
}

// EncodeToFD 编码数据,并且写入文件描述符
func (e headerLenEncoder) EncodeToFD(fd int32, bytes []byte) error {
l := len(bytes)
var buffer []byte
if l <= e.writeBufferLen-e.headerLen {
obj := e.writeBufferPool.Get()
defer e.writeBufferPool.Put(obj)
buffer = obj.([]byte)[0 : l+e.headerLen]
} else {
buffer = make([]byte, l+e.headerLen)
}

// 将消息长度写入buffer
binary.BigEndian.PutUint16(buffer[0:2], uint16(l))
// 将消息内容内容写入buffer
copy(buffer[e.headerLen:], bytes)

_, err := syscall.Write(int(fd), buffer)
return err
}

// EncodeToWriter 编码数据,并且写入Writer
func (e headerLenEncoder) EncodeToWriter(w io.Writer, bytes []byte) error {
l := len(bytes)
Expand Down
25 changes: 16 additions & 9 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *Conn) GetBuffer() *Buffer {
}

// Read 读取数据
func (c *Conn) Read() error {
func (c *Conn) read() error {
if c.server.options.timeout != 0 {
c.timer.Reset(c.server.options.timeout)
}
Expand All @@ -66,13 +66,25 @@ func (c *Conn) Read() error {
return err
}

err = c.server.decoder.Decode(c)
if err != nil {
return err
if c.server.options.decoder == nil {
c.server.handler.OnMessage(c, c.buffer.ReadAll())
} else {
var handle = func(bytes []byte) {
c.server.handler.OnMessage(c, bytes)
}
err = c.server.options.decoder.Decode(c.buffer, handle)
if err != nil {
return err
}
}
}
}

// WriteWithEncoder 使用编码器写入
func (c *Conn) WriteWithEncoder(bytes []byte) error {
return c.server.options.encoder.EncodeToWriter(c, bytes)
}

// Write 写入数据
func (c *Conn) Write(bytes []byte) (int, error) {
return syscall.Write(int(c.fd), bytes)
Expand Down Expand Up @@ -105,11 +117,6 @@ func (c *Conn) CloseRead() error {
return nil
}

// OnMessage 消息处理
func (c *Conn) OnMessage(bytes []byte) {
c.server.handler.OnMessage(c, bytes)
}

// GetData 获取数据
func (c *Conn) GetData() interface{} {
return c.data
Expand Down
115 changes: 115 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package gn

import (
"runtime"
"time"
)

// options Server初始化参数
type options struct {
decoder Decoder // 解码器
encoder Encoder // 编码器
readBufferLen int // 所读取的客户端包的最大长度,客户端发送的包不能超过这个长度,默认值是1024字节
acceptGNum int // 处理接受请求的goroutine数量
ioGNum int // 处理io的goroutine数量
ioEventQueueLen int // io事件队列长度
timeout time.Duration // 超时时间
}

type Option interface {
apply(*options)
}

type funcServerOption struct {
f func(*options)
}

func (fdo *funcServerOption) apply(do *options) {
fdo.f(do)
}

func newFuncServerOption(f func(*options)) *funcServerOption {
return &funcServerOption{
f: f,
}
}

// WithDecoder 设置解码器
func WithDecoder(decoder Decoder) Option {
return newFuncServerOption(func(o *options) {
o.decoder = decoder
})
}

// WithEncoder 设置解码器
func WithEncoder(encoder Encoder) Option {
return newFuncServerOption(func(o *options) {
o.encoder = encoder
})
}

// WithReadBufferLen 设置缓存区大小
func WithReadBufferLen(len int) Option {
return newFuncServerOption(func(o *options) {
if len <= 0 {
panic("acceptGNum must greater than 0")
}
o.readBufferLen = len
})
}

// WithAcceptGNum 设置建立连接的goroutine数量
func WithAcceptGNum(num int) Option {
return newFuncServerOption(func(o *options) {
if num <= 0 {
panic("acceptGNum must greater than 0")
}
o.acceptGNum = num
})
}

// WithIOGNum 设置处理IO的goroutine数量
func WithIOGNum(num int) Option {
return newFuncServerOption(func(o *options) {
if num <= 0 {
panic("IOGNum must greater than 0")
}
o.ioGNum = num
})
}

// WithIOEventQueueLen 设置IO事件队列长度,默认值是1024
func WithIOEventQueueLen(num int) Option {
return newFuncServerOption(func(o *options) {
if num <= 0 {
panic("ioEventQueueLen must greater than 0")
}
o.ioEventQueueLen = num
})
}

// WithTimeout 设置TCP超时检查的间隔时间以及超时时间
func WithTimeout(timeout time.Duration) Option {
return newFuncServerOption(func(o *options) {
if timeout <= 0 {
panic("timeoutTicker must greater than 0")
}

o.timeout = timeout
})
}

func getOptions(opts ...Option) *options {
cpuNum := runtime.NumCPU()
options := &options{
readBufferLen: 1024,
acceptGNum: cpuNum,
ioGNum: cpuNum,
ioEventQueueLen: 1024,
}

for _, o := range opts {
o.apply(options)
}
return options
}
Loading

0 comments on commit 7649abd

Please sign in to comment.