Skip to content

Commit

Permalink
Merge pull request #158 from kaleido-io/ws-throttling
Browse files Browse the repository at this point in the history
Implement throttling for WS client
  • Loading branch information
EnriqueL8 authored Dec 18, 2024
2 parents 0caf4ee + ecd4cf4 commit 7079e15
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/ffapi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (hs *HandlerFactory) handleOutput(ctx context.Context, res http.ResponseWri
}
if marshalErr != nil {
err := i18n.WrapError(ctx, marshalErr, i18n.MsgResponseMarshalError)
log.L(ctx).Errorf(err.Error())
log.L(ctx).Error(err.Error())
return 500, err
}
return status, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ffapi/openapi3.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (sg *SwaggerGen) addRoute(ctx context.Context, doc *openapi3.T, route *Rout
} else {
routeDescription = i18n.Expand(ctx, route.Description)
if routeDescription == "" && sg.options.PanicOnMissingDescription {
log.Panicf(i18n.NewError(ctx, i18n.MsgRouteDescriptionMissing, route.Name).Error())
log.Panic(i18n.NewError(ctx, i18n.MsgRouteDescriptionMissing, route.Name).Error())
}
}
op := &openapi3.Operation{
Expand Down
19 changes: 16 additions & 3 deletions pkg/ffapi/query_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"math"
"math/big"
"reflect"
"strconv"
Expand Down Expand Up @@ -110,11 +111,11 @@ func (f *stringField) Scan(src interface{}) error {
case int64:
f.s = strconv.FormatInt(tv, 10)
case uint:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(uint64(tv), 10)
case uint32:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(uint64(tv), 10)
case uint64:
f.s = strconv.FormatInt(int64(tv), 10)
f.s = strconv.FormatUint(tv, 10)
case *fftypes.UUID:
if tv != nil {
f.s = tv.String()
Expand Down Expand Up @@ -242,10 +243,16 @@ func (f *int64Field) Scan(src interface{}) (err error) {
case int64:
f.i = tv
case uint:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = int64(tv)
case uint32:
f.i = int64(tv)
case uint64:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = int64(tv)
case string:
f.i, err = strconv.ParseInt(src.(string), 10, 64)
Expand Down Expand Up @@ -277,10 +284,16 @@ func (f *bigIntField) Scan(src interface{}) (err error) {
case int64:
f.i = fftypes.NewFFBigInt(tv)
case uint:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = fftypes.NewFFBigInt(int64(tv))
case uint32:
f.i = fftypes.NewFFBigInt(int64(tv))
case uint64:
if tv > math.MaxInt64 {
return i18n.NewError(context.Background(), i18n.MsgTypeRestoreFailed, src, f.i)
}
f.i = fftypes.NewFFBigInt(int64(tv))
case fftypes.FFBigInt:
i := tv
Expand Down
13 changes: 8 additions & 5 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Config struct {
}

var (
rateLimiter *rate.Limiter
rateLimiterMap map[*resty.Client]*rate.Limiter
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
Expand Down Expand Up @@ -177,7 +177,7 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client
return NewWithConfig(ctx, *ffrestyConfig), nil
}

func getRateLimiter(rps, burst int) *rate.Limiter {
func GetRateLimiter(rps, burst int) *rate.Limiter {
if rps != 0 { // if rps is not set no need for a rate limiter
rpsLimiter := rate.Limit(rps)
if burst == 0 {
Expand Down Expand Up @@ -226,8 +226,11 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
}
client = resty.NewWithClient(httpClient)
}
if rateLimiterMap == nil {
rateLimiterMap = make(map[*resty.Client]*rate.Limiter)
}

rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)
rateLimiterMap[client] = GetRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
Expand All @@ -242,9 +245,9 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rateLimiter != nil {
if rateLimiterMap[client] != nil {
// Wait for permission to proceed with the request
err := rateLimiter.Wait(req.Context())
err := rateLimiterMap[client].Wait(req.Context())
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestRequestWithRateLimiter(t *testing.T) {
}

func TestRequestWithRateLimiterHighBurst(t *testing.T) {
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds
expectedNumberOfRequest := 20 // allow all requests to be processed within 1 second

customClient := &http.Client{}

Expand Down Expand Up @@ -273,12 +273,12 @@ func TestRateLimiterFailure(t *testing.T) {
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
rateLimiterMap[c] = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
assert.Nil(t, resp)
rateLimiter = nil // reset limiter
rateLimiterMap = nil // reset limiter
}

func TestRequestRetry(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/fftypes/int.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,7 +45,7 @@ func (i *FFuint64) UnmarshalJSON(b []byte) error {
if !ok {
return i18n.NewError(context.Background(), i18n.MsgBigIntParseFailed, b)
}
*i = FFuint64(bi.Int64())
*i = FFuint64(bi.Uint64())
return nil
case float64:
*i = FFuint64(val)
Expand Down
4 changes: 2 additions & 2 deletions pkg/fftypes/jsonobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (jd JSONObject) GetStringOk(key string) (string, bool) {
case int64:
return strconv.FormatInt(vt, 10), true
case uint:
return strconv.FormatInt(int64(vt), 10), true
return strconv.FormatUint(uint64(vt), 10), true
case uint8:
return strconv.FormatInt(int64(vt), 10), true
case uint16:
return strconv.FormatInt(int64(vt), 10), true
case uint32:
return strconv.FormatInt(int64(vt), 10), true
case uint64:
return strconv.FormatInt(int64(vt), 10), true
return strconv.FormatUint(vt, 10), true
case nil:
return "", false // no need to log for nil
default:
Expand Down
10 changes: 6 additions & 4 deletions pkg/fftypes/uuid.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -93,11 +93,13 @@ func (u *UUID) UnmarshalBinary(b []byte) error {
}

func (u *UUID) HashBucket(buckets int) int {
if u == nil {
if u == nil || buckets <= 0 {
return 0
}
// Take the last random 4 bytes and mod it against the bucket count to generate
// a deterministic hash bucket allocation for the UUID V4
// Explicitly cast `buckets` to uint64 to match the type used in the modulo operation
// and ensure the result is safely converted back to int.

// #nosec G115 - Safe because `buckets` is calculated using modulo operation
return int(binary.BigEndian.Uint64((*u)[8:]) % uint64(buckets))

}
Expand Down
4 changes: 3 additions & 1 deletion pkg/fftypes/uuid_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,6 +93,8 @@ func TestHashBucket(t *testing.T) {
assert.Equal(t, 0, u3.HashBucket(2))

assert.Equal(t, 0, ((*UUID)(nil)).HashBucket(12345))
assert.Equal(t, 0, u3.HashBucket(-1))
assert.Equal(t, 0, u3.HashBucket(0))

}

Expand Down
4 changes: 2 additions & 2 deletions pkg/i18n/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -82,7 +82,7 @@ func ffWrap(err error, msgKey ErrorMessageKey) error {

// NewError creates a new error
func NewError(ctx context.Context, msg ErrorMessageKey, inserts ...interface{}) error {
return ffWrap(errors.Errorf(truncate(ExpandWithCode(ctx, MessageKey(msg), inserts...), 2048)), msg)
return ffWrap(errors.New(truncate(ExpandWithCode(ctx, MessageKey(msg), inserts...), 2048)), msg)
}

// WrapError wraps an error
Expand Down
4 changes: 2 additions & 2 deletions pkg/metric/prometheusMetricsManager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -50,7 +50,7 @@ func checkAndUpdateLabelNames(ctx context.Context, labelNames []string, withDefa
for _, labelName := range labelNames {
if strings.HasPrefix(labelName, fireflySystemLabelsPrefix) {
err := i18n.NewError(ctx, i18n.MsgMetricsInvalidLabel, labelName, fireflySystemLabelsPrefix)
log.L(ctx).Errorf(err.Error())
log.L(ctx).Error(err.Error())
} else {
validLabelNames = append(validLabelNames, labelName)
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/version/version_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
// Kaleido, Inc. CONFIDENTIAL
// Unpublished Copyright © 2023 Kaleido, Inc. All Rights Reserved.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package version

Expand Down
45 changes: 29 additions & 16 deletions pkg/wsclient/wsclient.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -29,28 +29,32 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/retry"
"golang.org/x/time/rate"
)

type WSConfig struct {
HTTPURL string `json:"httpUrl,omitempty"`
WebSocketURL string `json:"wsUrl,omitempty"`
WSKeyPath string `json:"wsKeyPath,omitempty"`
ReadBufferSize int `json:"readBufferSize,omitempty"`
WriteBufferSize int `json:"writeBufferSize,omitempty"`
InitialDelay time.Duration `json:"initialDelay,omitempty"`
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
DisableReconnect bool `json:"disableReconnect"`
AuthUsername string `json:"authUsername,omitempty"`
AuthPassword string `json:"authPassword,omitempty"`
HTTPHeaders fftypes.JSONObject `json:"headers,omitempty"`
HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"`
TLSClientConfig *tls.Config `json:"tlsClientConfig,omitempty"`
ConnectionTimeout time.Duration `json:"connectionTimeout,omitempty"`
HTTPURL string `json:"httpUrl,omitempty"`
WebSocketURL string `json:"wsUrl,omitempty"`
WSKeyPath string `json:"wsKeyPath,omitempty"`
ReadBufferSize int `json:"readBufferSize,omitempty"`
WriteBufferSize int `json:"writeBufferSize,omitempty"`
InitialDelay time.Duration `json:"initialDelay,omitempty"`
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
DisableReconnect bool `json:"disableReconnect"`
AuthUsername string `json:"authUsername,omitempty"`
AuthPassword string `json:"authPassword,omitempty"`
ThrottleRequestsPerSecond int `json:"requestsPerSecond,omitempty"`
ThrottleBurst int `json:"burst,omitempty"`
HTTPHeaders fftypes.JSONObject `json:"headers,omitempty"`
HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"`
TLSClientConfig *tls.Config `json:"tlsClientConfig,omitempty"`
ConnectionTimeout time.Duration `json:"connectionTimeout,omitempty"`
// This one cannot be set in JSON - must be configured on the code interface
ReceiveExt bool
}
Expand Down Expand Up @@ -109,6 +113,7 @@ type wsClient struct {
heartbeatMux sync.Mutex
activePingSent *time.Time
lastPingCompleted time.Time
rateLimiter *rate.Limiter
}

// WSPreConnectHandler will be called before every connect/reconnect. Any error returned will prevent the websocket from connecting.
Expand Down Expand Up @@ -146,6 +151,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
heartbeatInterval: config.HeartbeatInterval,
useReceiveExt: config.ReceiveExt,
disableReconnect: config.DisableReconnect,
rateLimiter: ffresty.GetRateLimiter(config.ThrottleRequestsPerSecond, config.ThrottleBurst),
}
if w.useReceiveExt {
w.receiveExt = make(chan *WSPayload)
Expand Down Expand Up @@ -220,6 +226,13 @@ func (w *wsClient) SetHeader(header, value string) {
}

func (w *wsClient) Send(ctx context.Context, message []byte) error {
if w.rateLimiter != nil {
// Wait for permission to proceed with the request
err := w.rateLimiter.Wait(ctx)
if err != nil {
return err
}
}
// Send
select {
case w.send <- message:
Expand Down
Loading

0 comments on commit 7079e15

Please sign in to comment.