Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max_batch_request_num feature #672

Merged
merged 14 commits into from
Jul 28, 2023
1 change: 1 addition & 0 deletions cmd/ostracon/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func runProxy(cmd *cobra.Command, args []string) error {

cfg := rpcserver.DefaultConfig()
cfg.MaxBodyBytes = config.RPC.MaxBodyBytes
cfg.MaxRequestBatchRequest = config.RPC.MaxRequestBatchRequest
cfg.MaxHeaderBytes = config.RPC.MaxHeaderBytes
cfg.MaxOpenConnections = maxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
Expand Down
11 changes: 9 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ type RPCConfig struct {
// Maximum size of request body, in bytes
MaxBodyBytes int64 `mapstructure:"max_body_bytes"`

// Maximum number of requests in a request body
MaxRequestBatchRequest int `mapstructure:"max_request_batch_request"`
170210 marked this conversation as resolved.
Show resolved Hide resolved

// Maximum size of request header, in bytes
MaxHeaderBytes int `mapstructure:"max_header_bytes"`

Expand Down Expand Up @@ -492,8 +495,9 @@ func DefaultRPCConfig() *RPCConfig {
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxBodyBytes: int64(1000000), // 1MB
MaxRequestBatchRequest: 10,
MaxHeaderBytes: 1 << 20, // same as the net/http default

TLSCertFile: "",
TLSKeyFile: "",
Expand Down Expand Up @@ -551,6 +555,9 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxBodyBytes < 0 {
return errors.New("max_body_bytes can't be negative")
}
if cfg.MaxRequestBatchRequest < 0 {
return errors.New("max_request_batch_request can't be negative")
}
if cfg.MaxHeaderBytes < 0 {
return errors.New("max_header_bytes can't be negative")
}
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestRPCConfigValidateBasic(t *testing.T) {
"MaxSubscriptionsPerClient",
"TimeoutBroadcastTxCommit",
"MaxBodyBytes",
"MaxRequestBatchRequest",
"MaxHeaderBytes",
}

Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
# Maximum size of request body, in bytes
max_body_bytes = {{ .RPC.MaxBodyBytes }}

# Maximum number of requests in a request body
max_request_batch_request = {{ .RPC.MaxRequestBatchRequest }}

# Maximum size of request header, in bytes
max_header_bytes = {{ .RPC.MaxHeaderBytes }}

Expand Down
3 changes: 3 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,11 +1152,13 @@ func (n *Node) startRPC() ([]net.Listener, error) {

config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxRequestBatchRequest = n.config.RPC.MaxRequestBatchRequest
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
config.ReadTimeout = n.config.RPC.ReadTimeout
config.WriteTimeout = n.config.RPC.WriteTimeout
config.IdleTimeout = n.config.RPC.IdleTimeout

170210 marked this conversation as resolved.
Show resolved Hide resolved
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
Expand Down Expand Up @@ -1235,6 +1237,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxRequestBatchRequest = n.config.RPC.MaxRequestBatchRequest
// NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections
config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
Expand Down
2 changes: 2 additions & 0 deletions rpc/jsonrpc/server/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
TestFuncMap = map[string]*RPCFunc{"c": TestRPCFunc}
TestGoodBody = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`
TestBadParams = `{"jsonrpc": "2.0", "method": "c", "id": "0", "params": "s=a,i=b"}`

170210 marked this conversation as resolved.
Show resolved Hide resolved
TestMaxRequestBatchRequest = "10"
)

type FailManager struct {
Expand Down
24 changes: 24 additions & 0 deletions rpc/jsonrpc/server/http_json_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"reflect"
"sort"
"strconv"

tmjson "github.com/Finschia/ostracon/libs/json"
"github.com/Finschia/ostracon/libs/log"
Expand Down Expand Up @@ -54,6 +55,29 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
}
requests = []types.RPCRequest{request}
}
// read the maxRequestBatchRequest from header
maxRequestBatchRequest, err := strconv.Atoi(r.Header.Get("MaxRequestBatchRequest"))
if err != nil {
res := types.RPCInvalidRequestError(nil,
fmt.Errorf("error reading request header key"),
)
if wErr := WriteRPCResponseHTTPError(w, http.StatusInternalServerError, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}

// if the number of requests in the batch exceeds the max_request_batch_request
// return a invalid request error
if len(requests) > maxRequestBatchRequest {
res := types.RPCInvalidRequestError(nil,
fmt.Errorf("too many requests in a request batch, current is %d, where the upper limit is %d", len(requests), maxRequestBatchRequest),
)
if wErr := WriteRPCResponseHTTPError(w, http.StatusBadRequest, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}

// Set the default response cache to true unless
// 1. Any RPC request error.
Expand Down
42 changes: 42 additions & 0 deletions rpc/jsonrpc/server/http_json_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestRPCParams(t *testing.T) {

for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestJSONRPCID(t *testing.T) {

for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestRPCNotification(t *testing.T) {
mux := testMux()
body := strings.NewReader(`{"jsonrpc": "2.0"}`)
req, _ := http.NewRequest("POST", "http://localhost/", body)
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -176,6 +179,7 @@ func TestRPCNotificationInBatch(t *testing.T) {
}
for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand Down Expand Up @@ -218,6 +222,40 @@ func TestRPCNotificationInBatch(t *testing.T) {
}
}

func TestTooManyRPCNotificationInBatch_error(t *testing.T) {
// prepare the mock batch request
var jsonArray []json.RawMessage
for i := 0; i < 11; i++ {
jsonArray = append(jsonArray, json.RawMessage(TestGoodBody))
}
jsonData, err := json.Marshal(jsonArray)
if err != nil {
t.Errorf("expected an array, couldn't marshal it")
}
// execute the batch request
mux := testMux()
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(string(jsonData)))
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
res.Body.Close()
// always expecting back a 400 error
assert.Equal(t, http.StatusBadRequest, res.StatusCode, "should always return 400")
}

func TestNoMaxRequestBatchRequestField_error(t *testing.T) {
// execute the batch request
mux := testMux()
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(TestGoodBody))
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
res.Body.Close()
// always expecting back a 500 error
assert.Equal(t, http.StatusInternalServerError, res.StatusCode, "should always return 500")
}

func TestUnknownRPCPath(t *testing.T) {
mux := testMux()
req, _ := http.NewRequest("GET", "http://localhost/unknownrpcpath", nil)
Expand All @@ -234,6 +272,7 @@ func TestRPCResponseCache(t *testing.T) {
mux := testMux()
body := strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["1"]}`)
req, _ := http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
Expand All @@ -249,6 +288,7 @@ func TestRPCResponseCache(t *testing.T) {
// send a request with default height.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["0"]}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
Expand All @@ -265,6 +305,7 @@ func TestRPCResponseCache(t *testing.T) {
// send a request with default height, but as empty set of parameters.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": []}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
Expand Down Expand Up @@ -294,6 +335,7 @@ func TestMakeJSONRPCHandler_Unmarshal_WriteRPCResponseHTTPError_error(t *testing
func TestMakeJSONRPCHandler_last_WriteRPCResponseHTTP_error(t *testing.T) {
handlerFunc := makeJSONRPCHandler(TestFuncMap, log.TestingLogger())
req, _ := http.NewRequest("GET", "http://localhost/", strings.NewReader(TestGoodBody))
req.Header.Set("MaxRequestBatchRequest", TestMaxRequestBatchRequest)
// WriteRPCResponseHTTP error
rec := NewFailedWriteResponseWriter()
handlerFunc.ServeHTTP(rec, req)
Expand Down
61 changes: 49 additions & 12 deletions rpc/jsonrpc/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
"time"

Expand All @@ -32,31 +33,45 @@ type Config struct {
// MaxBodyBytes controls the maximum number of bytes the
// server will read parsing the request body.
MaxBodyBytes int64
// MaxBodyBytes controls the maximum number of one request batch
MaxRequestBatchRequest int
// mirrors http.Server#MaxHeaderBytes
MaxHeaderBytes int
}

// DefaultConfig returns a default configuration.
func DefaultConfig() *Config {
return &Config{
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxRequestBatchRequest: 10,
MaxHeaderBytes: 1 << 20, // same as the net/http default
}
}

// Serve creates a http.Server and calls Serve with the given listener. It
// wraps handler with RecoverAndLogHandler and a handler, which limits the max
// body size to config.MaxBodyBytes.
// wraps handler with RecoverAndLogHandler and handlers. Handlers contain
// a maxBytesHandler, which limits the max body size to config.MaxBodyBytes and
// a maxBatchRequestHandler, which limits the max number of requests in a batch.
//
// NOTE: This function blocks - you may want to call it in a go-routine.
func Serve(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info("serve", "msg", log.NewLazySprintf("Starting RPC HTTP server on %s", listener.Addr()))

handlers := maxBatchRequestHandler{
h: maxBytesHandler{
h: handler,
n: config.MaxBodyBytes,
},
NewHeaderName: "MaxRequestBatchRequest",
NewHeaderValue: strconv.Itoa(config.MaxRequestBatchRequest),
}

s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: RecoverAndLogHandler(handlers, logger),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand All @@ -69,8 +84,9 @@ func Serve(listener net.Listener, handler http.Handler, logger log.Logger, confi
}

// Serve creates a http.Server and calls ServeTLS with the given listener,
// certFile and keyFile. It wraps handler with RecoverAndLogHandler and a
// handler, which limits the max body size to config.MaxBodyBytes.
// certFile and keyFile. It wraps handler with RecoverAndLogHandler and handlers.
// Handlers contain a maxBytesHandler, which limits the max body size to config.MaxBodyBytes and
// a maxBatchRequestHandler, which limits the max number of requests in a batch.
//
// NOTE: This function blocks - you may want to call it in a go-routine.
func ServeTLS(
Expand All @@ -82,8 +98,18 @@ func ServeTLS(
) error {
logger.Info("serve tls", "msg", log.NewLazySprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile))

handlers := maxBatchRequestHandler{
h: maxBytesHandler{
h: handler,
n: config.MaxBodyBytes,
},
NewHeaderName: "MaxRequestBatchRequest",
NewHeaderValue: strconv.Itoa(config.MaxRequestBatchRequest),
}

s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: RecoverAndLogHandler(handlers, logger),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand Down Expand Up @@ -261,6 +287,17 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.h.ServeHTTP(w, r)
}

type maxBatchRequestHandler struct {
h http.Handler
NewHeaderName string
NewHeaderValue string
}

func (h maxBatchRequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set(h.NewHeaderName, h.NewHeaderValue)
h.h.ServeHTTP(w, r)
}

// Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config *Config) (listener net.Listener, err error) {
Expand Down
Loading
Loading