Skip to content

Commit

Permalink
Use custom ReadAll using globalbuffer for communication (#507)
Browse files Browse the repository at this point in the history
* Move ReadAll function using global pool buffer to helpers
* Use ReadAll function instead of io.ReadAll for decrease memory allocation for REST handler and WS communication.
  • Loading branch information
DnlLrssn authored Oct 15, 2024
1 parent 3eafa55 commit 0716af2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 28 deletions.
27 changes: 27 additions & 0 deletions helpers/BufferPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helpers

import (
"bytes"
"io"
"sync"
)

Expand Down Expand Up @@ -29,3 +30,29 @@ func (bp *BufferPool) Put(buf *bytes.Buffer) {
buf.Reset()
bp.pool.Put(buf)
}

func ReadAll(r io.Reader) ([]byte, error) {
buf := GlobalBufferPool.Get()
defer GlobalBufferPool.Put(buf)

capacity := int64(bytes.MinRead)
var err error
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
if int64(int(capacity)) == capacity {
buf.Grow(int(capacity))
}
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}
28 changes: 1 addition & 27 deletions session/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func (handler *RestHandler) QueueRequestWithCallback(actionState *action.State,
request.ResponseStatus = request.response.Status
request.ResponseStatusCode = request.response.StatusCode
request.ResponseHeaders = request.response.Header
request.ResponseBody, errRequest = io.ReadAll(request.response.Body)
request.ResponseBody, errRequest = helpers.ReadAll(request.response.Body)
// When content type is a stream normal metric log will be time to response without starting to stream the body. Thus this will log response time to stream end
if logEntry.ShouldLogTrafficMetrics() && strings.HasPrefix(request.response.Header.Get("Content-Type"), "text/event-stream") {
logEntry.LogTrafficMetric(time.Since(doTs).Nanoseconds(), 0, uint64(len(request.ResponseBody)), -1, req.URL.Path, "", "STREAM", "")
Expand All @@ -671,32 +671,6 @@ func (handler *RestHandler) addVirtualProxy(request *RestRequest) error {
return nil
}

func ReadAll(r io.Reader) ([]byte, error) {
buf := helpers.GlobalBufferPool.Get()
defer helpers.GlobalBufferPool.Put(buf)

capacity := int64(bytes.MinRead)
var err error
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
if int64(int(capacity)) == capacity {
buf.Grow(int(capacity))
}
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}

func prependURLPath(aURL, pathToPrepend string) (string, error) {
urlObj, err := url.Parse(aURL)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion wsdialer/wsdialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func readMessage(r io.Reader, m []wsutil.Message, maxFrameSize int64) ([]wsutil.
State: gobwas.StateClientSide,
CheckUTF8: true,
OnIntermediate: func(hdr gobwas.Header, src io.Reader) error {
bts, err := io.ReadAll(src)
bts, err := helpers.ReadAll(src)
if err != nil {
return err
}
Expand Down

0 comments on commit 0716af2

Please sign in to comment.