From 80b26f8b12a72407576dafc5ed0b89c95dd9dbf5 Mon Sep 17 00:00:00 2001 From: gitsrc <34047788+gitsrc@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:02:59 +0000 Subject: [PATCH 1/2] feat(framework):update framework Signed-off-by: gitsrc <34047788+gitsrc@users.noreply.github.com> --- .gitignore | 3 +- example/memory_kv/server.go | 37 +++- go.mod | 11 +- go.sum | 19 +- pkg/resp/comparse.go | 389 ++++++++++++++++++------------------ redhub.go | 157 ++++++--------- 6 files changed, 302 insertions(+), 314 deletions(-) diff --git a/.gitignore b/.gitignore index d38e4ac..04d1287 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ release_bin vendor #vscode -.vscode \ No newline at end of file +.vscode +example/memory_kv/memory_kv diff --git a/example/memory_kv/server.go b/example/memory_kv/server.go index e3aa6d9..de7dc9d 100644 --- a/example/memory_kv/server.go +++ b/example/memory_kv/server.go @@ -15,51 +15,69 @@ import ( ) func main() { + // Define a mutex and a map to store data var mu sync.RWMutex var items = make(map[string][]byte) + + // Define command-line arguments var network string var addr string var multicore bool var reusePort bool var pprofDebug bool var pprofAddr string + + // Parse command-line arguments flag.StringVar(&network, "network", "tcp", "server network (default \"tcp\")") - flag.StringVar(&addr, "addr", "127.0.0.1:6380", "server addr (default \":6380\")") - flag.BoolVar(&multicore, "multicore", true, "multicore") - flag.BoolVar(&reusePort, "reusePort", false, "reusePort") - flag.BoolVar(&pprofDebug, "pprofDebug", false, "open pprof") + flag.StringVar(&addr, "addr", "127.0.0.1:6380", "server address (default \":6380\")") + flag.BoolVar(&multicore, "multicore", true, "enable multicore support") + flag.BoolVar(&reusePort, "reusePort", false, "enable port reuse") + flag.BoolVar(&pprofDebug, "pprofDebug", false, "enable pprof debugging") flag.StringVar(&pprofAddr, "pprofAddr", ":8888", "pprof address") flag.Parse() + + // Start pprof server if debugging is enabled if pprofDebug { go func() { http.ListenAndServe(pprofAddr, nil) }() } + // Create the protocol address string protoAddr := fmt.Sprintf("%s://%s", network, addr) + + // Define RedHub options option := redhub.Options{ Multicore: multicore, ReusePort: reusePort, } + // Create a new RedHub instance with custom handlers rh := redhub.NewRedHub( + // Connection initialization handler func(c *redhub.Conn) (out []byte, action redhub.Action) { return }, + // Connection error handler func(c *redhub.Conn, err error) (action redhub.Action) { return }, + // Command handler func(cmd resp.Command, out []byte) ([]byte, redhub.Action) { var status redhub.Action switch strings.ToLower(string(cmd.Args[0])) { default: + // Handle unknown commands out = resp.AppendError(out, "ERR unknown command '"+string(cmd.Args[0])+"'") case "ping": + // Handle PING command out = resp.AppendString(out, "PONG") case "quit": + // Handle QUIT command out = resp.AppendString(out, "OK") status = redhub.Close case "set": + // Handle SET command if len(cmd.Args) != 3 { out = resp.AppendError(out, "ERR wrong number of arguments for '"+string(cmd.Args[0])+"' command") break @@ -69,6 +87,7 @@ func main() { mu.Unlock() out = resp.AppendString(out, "OK") case "get": + // Handle GET command if len(cmd.Args) != 2 { out = resp.AppendError(out, "ERR wrong number of arguments for '"+string(cmd.Args[0])+"' command") break @@ -82,6 +101,7 @@ func main() { out = resp.AppendBulk(out, val) } case "del": + // Handle DEL command if len(cmd.Args) != 2 { out = resp.AppendError(out, "ERR wrong number of arguments for '"+string(cmd.Args[0])+"' command") break @@ -96,8 +116,7 @@ func main() { out = resp.AppendInt(out, 1) } case "config": - // This simple (blank) response is only here to allow for the - // redis-benchmark command to work with this example. + // Handle CONFIG command (for redis-benchmark compatibility) out = resp.AppendArray(out, 2) out = resp.AppendBulk(out, cmd.Args[2]) out = resp.AppendBulkString(out, "") @@ -105,8 +124,12 @@ func main() { return out, status }, ) + + // Log the server start log.Printf("started redhub server at %s", addr) - err := redhub.ListendAndServe(protoAddr, option, rh) + + // Start the RedHub server + err := redhub.ListenAndServe(protoAddr, option, rh) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 6c06bcf..d7c9454 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,15 @@ module github.com/IceFireDB/redhub -go 1.16 +go 1.22 + +require github.com/panjf2000/gnet v1.6.7 require ( github.com/panjf2000/ants/v2 v2.9.0 // indirect - github.com/panjf2000/gnet v1.6.7 github.com/stretchr/testify v1.8.4 // indirect - go.uber.org/zap v1.21.0 // indirect - golang.org/x/sys v0.15.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/sys v0.26.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/go.sum b/go.sum index 41d1ca1..6b1df8f 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,4 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -12,7 +11,6 @@ github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0t github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/panjf2000/gnet v1.6.7 h1:zv1k6kw80sG5ZQrLpbbFDheNCm50zm3z2e3ck5GwMOM= github.com/panjf2000/gnet v1.6.7/go.mod h1:KcOU7QsCaCBjeD5kyshBIamG3d9kAQtlob4Y0v0E+sc= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -31,17 +29,17 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= -go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= -go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -59,8 +57,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -78,7 +76,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/resp/comparse.go b/pkg/resp/comparse.go index 7c3d1e3..34f468b 100644 --- a/pkg/resp/comparse.go +++ b/pkg/resp/comparse.go @@ -1,6 +1,9 @@ package resp -import "errors" +import ( + "errors" + "strconv" +) var ( errUnbalancedQuotes = &errProtocol{"unbalanced quotes in request"} @@ -11,6 +14,7 @@ var ( errTooMuchData = errors.New("too much data") ) +// errProtocol represents a protocol error type errProtocol struct { msg string } @@ -19,235 +23,226 @@ func (err *errProtocol) Error() string { return "Protocol error: " + err.msg } -// Command represent a command +// Command represents a RESP command type Command struct { - // Raw is a encoded RESP message. - Raw []byte - // Args is a series of arguments that make up the command. - Args [][]byte + Raw []byte // Raw is an encoded RESP message + Args [][]byte // Args is a series of arguments that make up the command } +// parseInt converts a byte slice to an integer func parseInt(b []byte) (int, bool) { - if len(b) == 1 && b[0] >= '0' && b[0] <= '9' { - return int(b[0] - '0'), true - } - var n int - var sign bool - var i int - if len(b) > 0 && b[0] == '-' { - sign = true - i++ - } - for ; i < len(b); i++ { - if b[i] < '0' || b[i] > '9' { - return 0, false - } - n = n*10 + int(b[i]-'0') - } - if sign { - n *= -1 - } - return n, true + // Use the built-in strconv.Atoi for better performance + n, err := strconv.Atoi(string(b)) + return n, err == nil } -// ReadCommands parses a raw message and returns commands. +// ReadCommands parses a raw message and returns commands func ReadCommands(buf []byte) ([]Command, []byte, error) { var cmds []Command var writeback []byte b := buf - if len(b) > 0 { - // we have data, yay! - // but is this enough data for a complete command? or multiple? - next: + + for len(b) > 0 { switch b[0] { + case '*': + // RESP formatted command + cmd, rest, err := parseRESPCommand(b) + if err != nil { + return nil, writeback, err + } + if cmd != nil { + cmds = append(cmds, *cmd) + } + b = rest default: - // just a plain text command - for i := 0; i < len(b); i++ { - if b[i] == '\n' { - var line []byte - if i > 0 && b[i-1] == '\r' { - line = b[:i-1] - } else { - line = b[:i] - } - var cmd Command - var quote bool - var quotech byte - var escape bool - outer: - for { - nline := make([]byte, 0, len(line)) - for i := 0; i < len(line); i++ { - c := line[i] - if !quote { - if c == ' ' { - if len(nline) > 0 { - cmd.Args = append(cmd.Args, nline) - } - line = line[i+1:] - continue outer - } - if c == '"' || c == '\'' { - if i != 0 { - return nil, writeback, errUnbalancedQuotes - } - quotech = c - quote = true - line = line[i+1:] - continue outer - } - } else { - if escape { - escape = false - switch c { - case 'n': - c = '\n' - case 'r': - c = '\r' - case 't': - c = '\t' - } - } else if c == quotech { - quote = false - quotech = 0 - cmd.Args = append(cmd.Args, nline) - line = line[i+1:] - if len(line) > 0 && line[0] != ' ' { - return nil, writeback, errUnbalancedQuotes - } - continue outer - } else if c == '\\' { - escape = true - continue - } - } - nline = append(nline, c) + // Plain text command + cmd, rest, err := parsePlainTextCommand(b) + if err != nil { + return nil, writeback, err + } + if cmd != nil { + cmds = append(cmds, *cmd) + } + b = rest + } + } + + if len(b) > 0 { + writeback = b + } + + if len(cmds) > 0 { + return cmds, writeback, nil + } + return nil, writeback, nil +} + +// parseRESPCommand parses a RESP formatted command +func parseRESPCommand(b []byte) (*Command, []byte, error) { + marks := make([]int, 0, 16) + for i := 1; i < len(b); i++ { + if b[i] == '\n' { + if b[i-1] != '\r' { + return nil, nil, errInvalidMultiBulkLength + } + count, ok := parseInt(b[1 : i-1]) + if !ok || count <= 0 { + return nil, nil, errInvalidMultiBulkLength + } + marks = marks[:0] + for j := 0; j < count; j++ { + i++ + if i >= len(b) || b[i] != '$' { + return nil, b, nil // Not enough data + } + si := i + for ; i < len(b); i++ { + if b[i] == '\n' { + if b[i-1] != '\r' { + return nil, nil, errInvalidBulkLength } - if quote { - return nil, writeback, errUnbalancedQuotes + size, ok := parseInt(b[si+1 : i-1]) + if !ok || size < 0 { + return nil, nil, errInvalidBulkLength } - if len(line) > 0 { - cmd.Args = append(cmd.Args, line) + if i+size+2 >= len(b) { + return nil, b, nil // Not enough data } - break - } - if len(cmd.Args) > 0 { - // convert this to resp command syntax - var wr Writer - wr.WriteArray(len(cmd.Args)) - for i := range cmd.Args { - wr.WriteBulk(cmd.Args[i]) - cmd.Args[i] = append([]byte(nil), cmd.Args[i]...) + if b[i+size+2] != '\n' || b[i+size+1] != '\r' { + return nil, nil, errInvalidBulkLength } - cmd.Raw = wr.b - cmds = append(cmds, cmd) - } - b = b[i+1:] - if len(b) > 0 { - goto next - } else { - goto done + i++ + marks = append(marks, i, i+size) + i += size + 1 + break } } } - case '*': - // resp formatted command - marks := make([]int, 0, 16) - outer2: - for i := 1; i < len(b); i++ { - if b[i] == '\n' { - if b[i-1] != '\r' { - return nil, writeback, errInvalidMultiBulkLength - } - count, ok := parseInt(b[1 : i-1]) - if !ok || count <= 0 { - return nil, writeback, errInvalidMultiBulkLength - } - marks = marks[:0] - for j := 0; j < count; j++ { - // read bulk length - i++ - if i < len(b) { - if b[i] != '$' { - return nil, writeback, &errProtocol{"expected '$', got '" + - string(b[i]) + "'"} - } - si := i - for ; i < len(b); i++ { - if b[i] == '\n' { - if b[i-1] != '\r' { - return nil, writeback, errInvalidBulkLength - } - size, ok := parseInt(b[si+1 : i-1]) - if !ok || size < 0 { - return nil, writeback, errInvalidBulkLength - } - if i+size+2 >= len(b) { - // not ready - break outer2 - } - if b[i+size+2] != '\n' || - b[i+size+1] != '\r' { - return nil, writeback, errInvalidBulkLength - } - i++ - marks = append(marks, i, i+size) - i += size + 1 - break - } - } - } - } - if len(marks) == count*2 { - var cmd Command - cmd.Raw = b[:i+1] - cmd.Args = make([][]byte, len(marks)/2) - // slice up the raw command into the args based on - // the recorded marks. - for h := 0; h < len(marks); h += 2 { - cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]] - } - cmds = append(cmds, cmd) - b = b[i+1:] - if len(b) > 0 { - goto next - } else { - goto done - } - } + if len(marks) == count*2 { + cmd := &Command{ + Raw: b[:i+1], + Args: make([][]byte, len(marks)/2), + } + for h := 0; h < len(marks); h += 2 { + cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]] } + return cmd, b[i+1:], nil } } - done: - //rd.start = rd.end - len(b) } - if len(b) > 0 { - writeback = b + return nil, b, nil +} + +// parsePlainTextCommand parses a plain text command +func parsePlainTextCommand(b []byte) (*Command, []byte, error) { + for i := 0; i < len(b); i++ { + if b[i] == '\n' { + line := b[:i] + if i > 0 && b[i-1] == '\r' { + line = b[:i-1] + } + cmd, err := parseLine(line) + if err != nil { + return nil, nil, err + } + if cmd != nil { + return cmd, b[i+1:], nil + } + return nil, b[i+1:], nil + } } - if len(cmds) > 0 { - return cmds, writeback, nil - } else { - return nil, writeback, nil + return nil, b, nil +} + +// parseLine parses a single line of plain text command +func parseLine(line []byte) (*Command, error) { + var cmd Command + var quote bool + var quotech byte + var escape bool + var arg []byte + + for i := 0; i < len(line); i++ { + c := line[i] + if !quote { + if c == ' ' { + if len(arg) > 0 { + cmd.Args = append(cmd.Args, arg) + arg = nil + } + continue + } + if c == '"' || c == '\'' { + if i != 0 { + return nil, errUnbalancedQuotes + } + quotech = c + quote = true + continue + } + } else { + if escape { + escape = false + switch c { + case 'n': + c = '\n' + case 'r': + c = '\r' + case 't': + c = '\t' + } + } else if c == quotech { + quote = false + quotech = 0 + cmd.Args = append(cmd.Args, arg) + arg = nil + continue + } else if c == '\\' { + escape = true + continue + } + } + arg = append(arg, c) + } + + if quote { + return nil, errUnbalancedQuotes + } + if len(arg) > 0 { + cmd.Args = append(cmd.Args, arg) + } + + if len(cmd.Args) > 0 { + // Convert to RESP command syntax + var wr Writer + wr.WriteArray(len(cmd.Args)) + for i := range cmd.Args { + wr.WriteBulk(cmd.Args[i]) + cmd.Args[i] = append([]byte(nil), cmd.Args[i]...) + } + cmd.Raw = wr.b + return &cmd, nil } + return nil, nil } -// Writer allows for writing RESP messages. +// Writer allows for writing RESP messages type Writer struct { b []byte } -// WriteArray writes an array header. You must then write additional -// sub-responses to the client to complete the response. -// For example to write two strings: -// -// c.WriteArray(2) -// c.WriteBulk("item 1") -// c.WriteBulk("item 2") +// WriteArray writes an array header func (w *Writer) WriteArray(count int) { - w.b = AppendArray(w.b, count) + w.b = append(w.b, '*') + w.b = strconv.AppendInt(w.b, int64(count), 10) + w.b = append(w.b, '\r', '\n') } -// WriteBulk writes bulk bytes to the client. +// WriteBulk writes bulk bytes func (w *Writer) WriteBulk(bulk []byte) { - w.b = AppendBulk(w.b, bulk) + w.b = append(w.b, '$') + w.b = strconv.AppendInt(w.b, int64(len(bulk)), 10) + w.b = append(w.b, '\r', '\n') + w.b = append(w.b, bulk...) + w.b = append(w.b, '\r', '\n') } diff --git a/redhub.go b/redhub.go index d61d7e4..0a478d5 100644 --- a/redhub.go +++ b/redhub.go @@ -9,158 +9,127 @@ import ( "github.com/panjf2000/gnet" ) +// Action represents the type of action to be taken after an event type Action int const ( - // None indicates that no action should occur following an event. + // None indicates that no action should occur following an event None Action = iota - - // Close closes the connection. + // Close indicates that the connection should be closed Close - - // Shutdown shutdowns the server. + // Shutdown indicates that the server should be shut down Shutdown ) +// Conn wraps a gnet.Conn type Conn struct { gnet.Conn } +// Options defines the configuration options for the RedHub server type Options struct { - // Multicore indicates whether the server will be effectively created with multi-cores, if so, - // then you must take care with synchronizing memory between all event callbacks, otherwise, - // it will run the server with single thread. The number of threads in the server will be automatically - // assigned to the value of logical CPUs usable by the current process. - Multicore bool - - // LockOSThread is used to determine whether each I/O event-loop is associated to an OS thread, it is useful when you - // need some kind of mechanisms like thread local storage, or invoke certain C libraries (such as graphics lib: GLib) - // that require thread-level manipulation via cgo, or want all I/O event-loops to actually run in parallel for a - // potential higher performance. - LockOSThread bool - - // ReadBufferCap is the maximum number of bytes that can be read from the client when the readable event comes. - // The default value is 64KB, it can be reduced to avoid starving subsequent client connections. - // - // Note that ReadBufferCap will be always converted to the least power of two integer value greater than - // or equal to its real amount. - ReadBufferCap int - - // LB represents the load-balancing algorithm used when assigning new connections. - LB gnet.LoadBalancing - - // NumEventLoop is set up to start the given number of event-loop goroutine. - // Note: Setting up NumEventLoop will override Multicore. - NumEventLoop int - - // ReusePort indicates whether to set up the SO_REUSEPORT socket option. - ReusePort bool - - // Ticker indicates whether the ticker has been set up. - Ticker bool - - // TCPKeepAlive sets up a duration for (SO_KEEPALIVE) socket option. - TCPKeepAlive time.Duration - - // TCPNoDelay controls whether the operating system should delay - // packet transmission in hopes of sending fewer packets (Nagle's algorithm). - // - // The default is true (no delay), meaning that data is sent - // as soon as possible after a Write. - TCPNoDelay gnet.TCPSocketOpt - - // SocketRecvBuffer sets the maximum socket receive buffer in bytes. + Multicore bool + LockOSThread bool + ReadBufferCap int + LB gnet.LoadBalancing + NumEventLoop int + ReusePort bool + Ticker bool + TCPKeepAlive time.Duration + TCPNoDelay gnet.TCPSocketOpt SocketRecvBuffer int - - // SocketSendBuffer sets the maximum socket send buffer in bytes. SocketSendBuffer int - - // ICodec encodes and decodes TCP stream. - Codec gnet.ICodec + Codec gnet.ICodec } -func NewRedHub( - onOpened func(c *Conn) (out []byte, action Action), - onClosed func(c *Conn, err error) (action Action), - handler func(cmd resp.Command, out []byte) ([]byte, Action), -) *redHub { - return &redHub{ - redHubBufMap: make(map[gnet.Conn]*connBuffer), - connSync: sync.RWMutex{}, - onOpened: onOpened, - onClosed: onClosed, - handler: handler, - } -} - -type redHub struct { +// RedHub represents the main server structure +type RedHub struct { *gnet.EventServer onOpened func(c *Conn) (out []byte, action Action) onClosed func(c *Conn, err error) (action Action) handler func(cmd resp.Command, out []byte) ([]byte, Action) redHubBufMap map[gnet.Conn]*connBuffer - connSync sync.RWMutex + connSync *sync.RWMutex } +// connBuffer holds the buffer and commands for each connection type connBuffer struct { buf bytes.Buffer command []resp.Command } -func (rs *redHub) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { +// NewRedHub creates a new RedHub instance +func NewRedHub( + onOpened func(c *Conn) (out []byte, action Action), + onClosed func(c *Conn, err error) (action Action), + handler func(cmd resp.Command, out []byte) ([]byte, Action), +) *RedHub { + return &RedHub{ + redHubBufMap: make(map[gnet.Conn]*connBuffer), + connSync: &sync.RWMutex{}, + onOpened: onOpened, + onClosed: onClosed, + handler: handler, + } +} + +// OnOpened is called when a new connection is opened +func (rs *RedHub) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { rs.connSync.Lock() - defer rs.connSync.Unlock() rs.redHubBufMap[c] = new(connBuffer) - rs.onOpened(&Conn{Conn: c}) - return + rs.connSync.Unlock() + out, act := rs.onOpened(&Conn{Conn: c}) + return out, gnet.Action(act) } -func (rs *redHub) OnClosed(c gnet.Conn, err error) (action gnet.Action) { +// OnClosed is called when a connection is closed +func (rs *RedHub) OnClosed(c gnet.Conn, err error) (action gnet.Action) { rs.connSync.Lock() - defer rs.connSync.Unlock() delete(rs.redHubBufMap, c) - rs.onClosed(&Conn{Conn: c}, err) - return + rs.connSync.Unlock() + return gnet.Action(rs.onClosed(&Conn{Conn: c}, err)) } -func (rs *redHub) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { +// React handles incoming data from connections +func (rs *RedHub) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { rs.connSync.RLock() - defer rs.connSync.RUnlock() cb, ok := rs.redHubBufMap[c] + rs.connSync.RUnlock() + if !ok { - out = resp.AppendError(out, "ERR Client is closed") - return + return resp.AppendError(out, "ERR Client is closed"), gnet.None } + cb.buf.Write(frame) cmds, lastbyte, err := resp.ReadCommands(cb.buf.Bytes()) if err != nil { - out = resp.AppendError(out, "ERR "+err.Error()) - return + return resp.AppendError(out, "ERR "+err.Error()), gnet.None } + cb.command = append(cb.command, cmds...) cb.buf.Reset() + if len(lastbyte) == 0 { - var status Action for len(cb.command) > 0 { cmd := cb.command[0] - if len(cb.command) == 1 { - cb.command = nil - } else { - cb.command = cb.command[1:] - } + cb.command = cb.command[1:] + + var status Action out, status = rs.handler(cmd, out) - switch status { - case Close: - action = gnet.Close + + if status == Close { + return out, gnet.Close } } } else { cb.buf.Write(lastbyte) } - return + + return out, gnet.None } -func ListendAndServe(addr string, options Options, rh *redHub) error { +// ListenAndServe starts the RedHub server +func ListenAndServe(addr string, options Options, rh *RedHub) error { serveOptions := gnet.Options{ Multicore: options.Multicore, LockOSThread: options.LockOSThread, From ed070185dde3ed2345302b023b344524dfe5219c Mon Sep 17 00:00:00 2001 From: gitsrc <34047788+gitsrc@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:04:38 +0000 Subject: [PATCH 2/2] feat(framework):update framework Signed-off-by: gitsrc <34047788+gitsrc@users.noreply.github.com> --- .github/workflows/pages.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pages.yml b/.github/workflows/pages.yml index bca9016..bec2c96 100644 --- a/.github/workflows/pages.yml +++ b/.github/workflows/pages.yml @@ -35,7 +35,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.18 + go-version: 1.23.2 - name: Build run: |