diff --git a/common/constant/default.go b/common/constant/default.go index 4363e3efd5..9b69226318 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,7 +46,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog" + DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps" DEFAULT_REFERENCE_FILTERS = "" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/constant/key.go b/common/constant/key.go index 3006f44732..dc24c2b5db 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -48,19 +48,26 @@ const ( ) const ( - TIMESTAMP_KEY = "timestamp" - REMOTE_TIMESTAMP_KEY = "remote.timestamp" - CLUSTER_KEY = "cluster" - LOADBALANCE_KEY = "loadbalance" - WEIGHT_KEY = "weight" - WARMUP_KEY = "warmup" - RETRIES_KEY = "retries" - BEAN_NAME = "bean.name" - FAIL_BACK_TASKS_KEY = "failbacktasks" - FORKS_KEY = "forks" - DEFAULT_FORKS = 2 - DEFAULT_TIMEOUT = 1000 - ACCESS_LOG_KEY = "accesslog" + TIMESTAMP_KEY = "timestamp" + REMOTE_TIMESTAMP_KEY = "remote.timestamp" + CLUSTER_KEY = "cluster" + LOADBALANCE_KEY = "loadbalance" + WEIGHT_KEY = "weight" + WARMUP_KEY = "warmup" + RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" + FORKS_KEY = "forks" + DEFAULT_FORKS = 2 + DEFAULT_TIMEOUT = 1000 + ACCESS_LOG_KEY = "accesslog" + TPS_LIMITER_KEY = "tps.limiter" + TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler" + TPS_LIMIT_RATE_KEY = "tps.limit.rate" + DEFAULT_TPS_LIMIT_RATE = "-1" + TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval" + DEFAULT_TPS_LIMIT_INTERVAL = "60000" + TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy" ) const ( diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go new file mode 100644 index 0000000000..7d5cb8e9f6 --- /dev/null +++ b/common/extension/tps_limit.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/filter/impl/tps" +) + +var ( + tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy) + tpsLimiter = make(map[string]func() tps.TpsLimiter) + tpsRejectedExecutionHandler = make(map[string]func() tps.RejectedExecutionHandler) +) + +func SetTpsLimiter(name string, creator func() tps.TpsLimiter) { + tpsLimiter[name] = creator +} + +func GetTpsLimiter(name string) tps.TpsLimiter { + creator, ok := tpsLimiter[name] + if !ok { + panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsLimiter.") + } + return creator() +} + +func SetTpsLimitStrategy(name string, creator func(rate int, interval int) tps.TpsLimitStrategy) { + tpsLimitStrategy[name] = creator +} + +func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.TpsLimitStrategy { + creator, ok := tpsLimitStrategy[name] + if !ok { + panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsLimitStrategy.") + } + return creator +} + +func SetTpsRejectedExecutionHandler(name string, creator func() tps.RejectedExecutionHandler) { + tpsRejectedExecutionHandler[name] = creator +} + +func GetTpsRejectedExecutionHandler(name string) tps.RejectedExecutionHandler { + creator, ok := tpsRejectedExecutionHandler[name] + if !ok { + panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + + "and you have register it by invoking extension.SetTpsRejectedExecutionHandler.") + } + return creator() +} diff --git a/config/method_config.go b/config/method_config.go index ac9242a230..431a30c1dd 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -25,12 +25,15 @@ import ( ) type MethodConfig struct { - InterfaceId string - InterfaceName string - Name string `yaml:"name" json:"name,omitempty" property:"name"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` + InterfaceId string + InterfaceName string + Name string `yaml:"name" json:"name,omitempty" property:"name"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` + TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` + TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` + TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` } func (c *MethodConfig) Prefix() string { diff --git a/config/service_config.go b/config/service_config.go index ee0457937e..fb65567a4b 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -43,27 +43,32 @@ import ( ) type ServiceConfig struct { - context context.Context - id string - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ',' - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version" ` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - Token string `yaml:"token" json:"token,omitempty" property:"token"` - AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"` - unexported *atomic.Bool - exported *atomic.Bool - rpcService common.RPCService - cacheProtocol protocol.Protocol - cacheMutex sync.Mutex + context context.Context + id string + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ',' + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version" ` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Token string `yaml:"token" json:"token,omitempty" property:"token"` + AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"` + TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"` + TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"` + TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"` + TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` + TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"` + unexported *atomic.Bool + exported *atomic.Bool + rpcService common.RPCService + cacheProtocol protocol.Protocol + cacheMutex sync.Mutex } func (c *ServiceConfig) Prefix() string { @@ -94,9 +99,9 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } func (srvconfig *ServiceConfig) Export() error { - //TODO: config center start here + // TODO: config center start here - //TODO:delay export + // TODO:delay export if srvconfig.unexported != nil && srvconfig.unexported.Load() { err := perrors.Errorf("The service %v has already unexported! ", srvconfig.InterfaceName) logger.Errorf(err.Error()) @@ -111,7 +116,7 @@ func (srvconfig *ServiceConfig) Export() error { urlMap := srvconfig.getUrlMap() for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) { - //registry the service reflect + // registry the service reflect methods, err := common.ServiceMap.Register(proto.Name, srvconfig.rpcService) if err != nil { err := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.InterfaceName, proto.Name, err.Error()) @@ -164,7 +169,7 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) { func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap := url.Values{} - //first set user params + // first set user params for k, v := range srvconfig.Params { urlMap.Set(k, v) } @@ -177,7 +182,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.GROUP_KEY, srvconfig.Group) urlMap.Set(constant.VERSION_KEY, srvconfig.Version) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) - //application info + // application info urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization) urlMap.Set(constant.NAME_KEY, providerConfig.ApplicationConfig.Name) @@ -186,16 +191,27 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner) urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment) - //filter + // filter urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, srvconfig.Filter, constant.DEFAULT_SERVICE_FILTERS)) - //filter special config + // filter special config urlMap.Set(constant.ACCESS_LOG_KEY, srvconfig.AccessLog) + // tps limiter + urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, srvconfig.TpsLimitStrategy) + urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, srvconfig.TpsLimitInterval) + urlMap.Set(constant.TPS_LIMIT_RATE_KEY, srvconfig.TpsLimitRate) + urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter) + urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler) for _, v := range srvconfig.Methods { - urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) - urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) - urlMap.Set("methods."+v.Name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10)) + prefix := "methods." + v.Name + "." + urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance) + urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries) + urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10)) + + urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy) + urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval) + urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate) } return urlMap diff --git a/config/testdata/provider_config.yml b/config/testdata/provider_config.yml index 5cbefe0811..71e45b9c0e 100644 --- a/config/testdata/provider_config.yml +++ b/config/testdata/provider_config.yml @@ -29,6 +29,15 @@ services: "UserProvider": registry: "hangzhouzk,shanghaizk" filter: "" + # the name of limiter + tps.limiter: "default" + # the time unit of interval is ms + tps.limit.interval: 60000 + tps.limit.rate: 200 + # the name of strategy + tps.limit.strategy: "slidingWindow" + # the name of RejectedExecutionHandler + tps.limit.rejected.handler: "default" protocol : "dubbo" # equivalent to interface of dubbo.xml interface : "com.ikurento.user.UserProvider" diff --git a/filter/impl/access_log_filter.go b/filter/impl/access_log_filter.go index 75c0582937..89fa34952f 100644 --- a/filter/impl/access_log_filter.go +++ b/filter/impl/access_log_filter.go @@ -132,7 +132,7 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) { logFile, err := ef.openLogFile(accessLog) if err != nil { - logger.Warnf("Can not open the access log file: %s, %v", accessLog, err) + logger.Warnf("Can not open the access log file: %s, %v", accessLog, err) return } logger.Debugf("Append log to %s", accessLog) diff --git a/filter/impl/tps/impl/rejected_execution_handler_mock.go b/filter/impl/tps/impl/rejected_execution_handler_mock.go new file mode 100644 index 0000000000..2f7869d61e --- /dev/null +++ b/filter/impl/tps/impl/rejected_execution_handler_mock.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: rejected_execution_handler.go + +// Package filter is a generated GoMock package. +package impl + +import ( + common "github.com/apache/dubbo-go/common" + protocol "github.com/apache/dubbo-go/protocol" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockRejectedExecutionHandler is a mock of RejectedExecutionHandler interface +type MockRejectedExecutionHandler struct { + ctrl *gomock.Controller + recorder *MockRejectedExecutionHandlerMockRecorder +} + +// MockRejectedExecutionHandlerMockRecorder is the mock recorder for MockRejectedExecutionHandler +type MockRejectedExecutionHandlerMockRecorder struct { + mock *MockRejectedExecutionHandler +} + +// NewMockRejectedExecutionHandler creates a new mock instance +func NewMockRejectedExecutionHandler(ctrl *gomock.Controller) *MockRejectedExecutionHandler { + mock := &MockRejectedExecutionHandler{ctrl: ctrl} + mock.recorder = &MockRejectedExecutionHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockRejectedExecutionHandler) EXPECT() *MockRejectedExecutionHandlerMockRecorder { + return m.recorder +} + +// RejectedExecution mocks base method +func (m *MockRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RejectedExecution", url, invocation) + ret0, _ := ret[0].(protocol.Result) + return ret0 +} + +// RejectedExecution indicates an expected call of RejectedExecution +func (mr *MockRejectedExecutionHandlerMockRecorder) RejectedExecution(url, invocation interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RejectedExecution", reflect.TypeOf((*MockRejectedExecutionHandler)(nil).RejectedExecution), url, invocation) +} diff --git a/filter/impl/tps/impl/rejected_execution_handler_only_log.go b/filter/impl/tps/impl/rejected_execution_handler_only_log.go new file mode 100644 index 0000000000..62d2fc81cb --- /dev/null +++ b/filter/impl/tps/impl/rejected_execution_handler_only_log.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/protocol" +) + +const HandlerName = "log" + +func init() { + extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler) + extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler) +} + +var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler +var onlyLogHandlerOnce sync.Once + +/** + * This implementation only logs the invocation info. + * it always return en error inside the result. + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.rejected.handler: "default" or "log" + * methods: + * - name: "GetUser" + */ +type OnlyLogRejectedExecutionHandler struct { +} + +func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { + logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String()) + return &protocol.RPCResult{} +} + +func GetOnlyLogRejectedExecutionHandler() tps.RejectedExecutionHandler { + onlyLogHandlerOnce.Do(func() { + onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} + }) + return onlyLogHandlerInstance +} diff --git a/filter/impl/tps/impl/rejected_execution_handler_only_log_test.go b/filter/impl/tps/impl/rejected_execution_handler_only_log_test.go new file mode 100644 index 0000000000..da54d8a106 --- /dev/null +++ b/filter/impl/tps/impl/rejected_execution_handler_only_log_test.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "net/url" + "testing" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func TestOnlyLogRejectedExecutionHandler_RejectedExecution(t *testing.T) { + handler := GetOnlyLogRejectedExecutionHandler() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, "methodName")) + handler.RejectedExecution(*invokeUrl, nil) +} diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go b/filter/impl/tps/impl/tps_limit_fix_window_strategy.go new file mode 100644 index 0000000000..7290619d55 --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_fix_window_strategy.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync/atomic" + "time" +) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" +) + +const ( + FixedWindowKey = "fixedWindow" +) + +func init() { + extension.SetTpsLimitStrategy(FixedWindowKey, NewFixedWindowTpsLimitStrategyImpl) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, NewFixedWindowTpsLimitStrategyImpl) +} + +/** + * It's the same as default implementation in Java + * It's not a thread-safe implementation. + * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategyImpl + * This is the default implementation. + * + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "default" or "fixedWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "default" or "fixedWindow" # method-level + */ +type FixedWindowTpsLimitStrategyImpl struct { + rate int32 + interval int64 + count int32 + timestamp int64 +} + +func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { + + current := time.Now().UnixNano() + if impl.timestamp+impl.interval < current { + // it's a new window + // if a lot of threads come here, the count will be set to 0 several times. + // so the return statement will be wrong. + impl.timestamp = current + impl.count = 0 + } + // this operation is thread-safe, but count + 1 may be overflow + return atomic.AddInt32(&impl.count, 1) <= impl.rate +} + +func NewFixedWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { + return &FixedWindowTpsLimitStrategyImpl{ + rate: int32(rate), + interval: int64(interval * 1000), // convert to ns + count: 0, + timestamp: time.Now().UnixNano(), + } +} diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go new file mode 100644 index 0000000000..55d0b14b75 --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewFixedWindowTpsLimitStrategyImpl(2, 60000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + + strategy = NewFixedWindowTpsLimitStrategyImpl(2, 2000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go b/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go new file mode 100644 index 0000000000..de98eb7528 --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "container/list" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" +) + +func init() { + extension.SetTpsLimitStrategy("slidingWindow", NewSlidingWindowTpsLimitStrategyImpl) +} + +/** + * it's thread-safe. + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "slidingWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "slidingWindow" # method-level + */ +type SlidingWindowTpsLimitStrategyImpl struct { + rate int + interval int64 + mutex *sync.Mutex + queue *list.List +} + +func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { + impl.mutex.Lock() + defer impl.mutex.Unlock() + // quick path + size := impl.queue.Len() + current := time.Now().UnixNano() + if size < impl.rate { + impl.queue.PushBack(current) + return true + } + + // slow path + boundary := current - impl.interval + + timestamp := impl.queue.Front() + // remove the element that out of the window + for timestamp != nil && timestamp.Value.(int64) < boundary { + impl.queue.Remove(timestamp) + timestamp = impl.queue.Front() + } + if impl.queue.Len() < impl.rate { + impl.queue.PushBack(current) + return true + } + return false +} + +func NewSlidingWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { + return &SlidingWindowTpsLimitStrategyImpl{ + rate: rate, + interval: int64(interval * 1000), + mutex: &sync.Mutex{}, + queue: list.New(), + } +} diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go new file mode 100644 index 0000000000..1d0187fa20 --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestSlidingWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewSlidingWindowTpsLimitStrategyImpl(2, 60000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + assert.False(t, strategy.IsAllowable()) + + strategy = NewSlidingWindowTpsLimitStrategyImpl(2, 2000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps/impl/tps_limit_strategy_mock.go b/filter/impl/tps/impl/tps_limit_strategy_mock.go new file mode 100644 index 0000000000..a653fb287a --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_strategy_mock.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: tps_limit_strategy.go + +// Package filter is a generated GoMock package. +package impl + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockTpsLimitStrategy is a mock of TpsLimitStrategy interface +type MockTpsLimitStrategy struct { + ctrl *gomock.Controller + recorder *MockTpsLimitStrategyMockRecorder +} + +// MockTpsLimitStrategyMockRecorder is the mock recorder for MockTpsLimitStrategy +type MockTpsLimitStrategyMockRecorder struct { + mock *MockTpsLimitStrategy +} + +// NewMockTpsLimitStrategy creates a new mock instance +func NewMockTpsLimitStrategy(ctrl *gomock.Controller) *MockTpsLimitStrategy { + mock := &MockTpsLimitStrategy{ctrl: ctrl} + mock.recorder = &MockTpsLimitStrategyMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTpsLimitStrategy) EXPECT() *MockTpsLimitStrategyMockRecorder { + return m.recorder +} + +// IsAllowable mocks base method +func (m *MockTpsLimitStrategy) IsAllowable() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsAllowable") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAllowable indicates an expected call of IsAllowable +func (mr *MockTpsLimitStrategyMockRecorder) IsAllowable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAllowable", reflect.TypeOf((*MockTpsLimitStrategy)(nil).IsAllowable)) +} diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go new file mode 100644 index 0000000000..5f43e8c3bf --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" +) + +func init() { + extension.SetTpsLimitStrategy("threadSafeFixedWindow", NewThreadSafeFixedWindowTpsLimitStrategyImpl) +} + +/** + * it's the thread-safe implementation. + * Also, it's a thread-safe decorator of FixedWindowTpsLimitStrategyImpl + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "threadSafeFixedWindow" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "threadSafeFixedWindow" # method-level + */ +type ThreadSafeFixedWindowTpsLimitStrategyImpl struct { + mutex *sync.Mutex + fixedWindow *FixedWindowTpsLimitStrategyImpl +} + +func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool { + impl.mutex.Lock() + defer impl.mutex.Unlock() + return impl.fixedWindow.IsAllowable() +} + +func NewThreadSafeFixedWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { + fixedWindowStrategy := NewFixedWindowTpsLimitStrategyImpl(rate, interval).(*FixedWindowTpsLimitStrategyImpl) + return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ + fixedWindow: fixedWindowStrategy, + mutex: &sync.Mutex{}, + } +} diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go new file mode 100644 index 0000000000..fea93dfa3b --- /dev/null +++ b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestThreadSafeFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { + strategy := NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 60000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + + strategy = NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 2000) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) + time.Sleep(time.Duration(2100 * 1000)) + assert.True(t, strategy.IsAllowable()) + assert.True(t, strategy.IsAllowable()) + assert.False(t, strategy.IsAllowable()) +} diff --git a/filter/impl/tps/impl/tps_limiter_method_service.go b/filter/impl/tps/impl/tps_limiter_method_service.go new file mode 100644 index 0000000000..3faf0d6e67 --- /dev/null +++ b/filter/impl/tps/impl/tps_limiter_method_service.go @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package impl + +import ( + "fmt" + "strconv" + "sync" +) + +import ( + "github.com/modern-go/concurrent" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/protocol" +) + +const name = "method-service" + +func init() { + extension.SetTpsLimiter(constant.DEFAULT_KEY, GetMethodServiceTpsLimiter) + extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter) +} + +/** + * This implementation allows developer to config both method-level and service-level tps limiter. + * for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limit.interval: 5000 # interval, the time unit is ms + * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.rate: 20, # in this case, this configuration in service-level will be ignored. + * - name: "UpdateUser" + * tps.limit.rate: -1, # If the rate<0, the method will be ignored + * + * + * More examples: + * case1: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limit.interval: 5000 # interval, the time unit is ms + * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * - name: "UpdateUser" + * tps.limit.rate: -1, + * in this case, the method UpdateUser will be ignored, + * which means that only GetUser will be limited by service-level configuration. + * + * case2: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * tps.limit.interval: 5000 # interval, the time unit is ms + * tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited. + * methods: + * - name: "GetUser" + * - name: "UpdateUser" + * tps.limit.rate: 30, + * In this case, the GetUser will be limited by service-level configuration(300 times in 5000ms), + * but UpdateUser will be limited by its configuration (30 times in 60000ms) + * + * case3: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too. + * methods: + * - name: "GetUser" + * - name: "UpdateUser" + * tps.limit.rate: 70, + * tps.limit.interval: 40000 + * In this case, only UpdateUser will be limited by its configuration (70 times in 40000ms) + */ +type MethodServiceTpsLimiterImpl struct { + tpsState *concurrent.Map +} + +func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool { + + methodConfigPrefix := "methods." + invocation.MethodName() + "." + + methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "") + methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "") + + limitTarget := url.ServiceKey() + + // method-level tps limit + if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 { + limitTarget = limitTarget + "#" + invocation.MethodName() + } + + limitState, found := limiter.tpsState.Load(limitTarget) + if found { + return limitState.(tps.TpsLimitStrategy).IsAllowable() + } + + limitRate := getLimitConfig(methodLimitRateConfig, url, invocation, + constant.TPS_LIMIT_RATE_KEY, + constant.DEFAULT_TPS_LIMIT_RATE) + + if limitRate < 0 { + return true + } + + limitInterval := getLimitConfig(methodIntervalConfig, url, invocation, + constant.TPS_LIMIT_INTERVAL_KEY, + constant.DEFAULT_TPS_LIMIT_INTERVAL) + if limitInterval <= 0 { + panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String())) + } + + limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, + url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) + limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) + limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator(int(limitRate), int(limitInterval))) + return limitState.(tps.TpsLimitStrategy).IsAllowable() +} + +func getLimitConfig(methodLevelConfig string, + url common.URL, + invocation protocol.Invocation, + configKey string, + defaultVal string) int64 { + + if len(methodLevelConfig) > 0 { + result, err := strconv.ParseInt(methodLevelConfig, 0, 0) + if err != nil { + panic(fmt.Sprintf("The %s for invocation %s # %s must be positive, please check your configuration!", + configKey, url.ServiceKey(), invocation.MethodName())) + } + return result + } + + result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0) + + if err != nil { + panic(fmt.Sprintf("Cannot parse the configuration %s, please check your configuration!", configKey)) + } + return result +} + +var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl +var methodServiceTpsLimiterOnce sync.Once + +func GetMethodServiceTpsLimiter() tps.TpsLimiter { + methodServiceTpsLimiterOnce.Do(func() { + methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ + tpsState: concurrent.NewMap(), + } + }) + return methodServiceTpsLimiterInstance +} diff --git a/filter/impl/tps/impl/tps_limiter_method_service_test.go b/filter/impl/tps/impl/tps_limiter_method_service_test.go new file mode 100644 index 0000000000..006e946387 --- /dev/null +++ b/filter/impl/tps/impl/tps_limiter_method_service_test.go @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "net/url" + "testing" +) +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Only_Service_Level(t *testing.T) { + methodName := "hello" + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20")) + + mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { + assert.Equal(t, 20, rate) + assert.Equal(t, 60000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + assert.True(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_No_Config(t *testing.T) { + methodName := "hello1" + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + // ctrl := gomock.NewController(t) + // defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "")) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + assert.True(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Method_Level_Override(t *testing.T) { + methodName := "hello2" + methodConfigPrefix := "methods." + methodName + "." + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"), + common.WithParamsValue(constant.TPS_LIMIT_INTERVAL_KEY, "3000"), + common.WithParamsValue(constant.TPS_LIMIT_STRATEGY_KEY, "invalid"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "7000"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, "default"), + ) + + mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { + assert.Equal(t, 40, rate) + assert.Equal(t, 7000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + assert.True(t, result) +} + +func TestMethodServiceTpsLimiterImpl_IsAllowable_Both_Method_And_Service(t *testing.T) { + methodName := "hello3" + methodConfigPrefix := "methods." + methodName + "." + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.INTERFACE_KEY, methodName), + common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"), + common.WithParamsValue(constant.TPS_LIMIT_INTERVAL_KEY, "3000"), + common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"), + ) + + mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) + mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { + assert.Equal(t, 40, rate) + assert.Equal(t, 3000, interval) + return mockStrategyImpl + }) + + limiter := GetMethodServiceTpsLimiter() + result := limiter.IsAllowable(*invokeUrl, invoc) + assert.True(t, result) +} diff --git a/filter/impl/tps/impl/tps_limiter_mock.go b/filter/impl/tps/impl/tps_limiter_mock.go new file mode 100644 index 0000000000..ff2f984e13 --- /dev/null +++ b/filter/impl/tps/impl/tps_limiter_mock.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: tps_limiter.go + +// Package filter is a generated GoMock package. +package impl + +import ( + common "github.com/apache/dubbo-go/common" + protocol "github.com/apache/dubbo-go/protocol" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockTpsLimiter is a mock of TpsLimiter interface +type MockTpsLimiter struct { + ctrl *gomock.Controller + recorder *MockTpsLimiterMockRecorder +} + +// MockTpsLimiterMockRecorder is the mock recorder for MockTpsLimiter +type MockTpsLimiterMockRecorder struct { + mock *MockTpsLimiter +} + +// NewMockTpsLimiter creates a new mock instance +func NewMockTpsLimiter(ctrl *gomock.Controller) *MockTpsLimiter { + mock := &MockTpsLimiter{ctrl: ctrl} + mock.recorder = &MockTpsLimiterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTpsLimiter) EXPECT() *MockTpsLimiterMockRecorder { + return m.recorder +} + +// IsAllowable mocks base method +func (m *MockTpsLimiter) IsAllowable(arg0 common.URL, arg1 protocol.Invocation) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsAllowable", arg0, arg1) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAllowable indicates an expected call of IsAllowable +func (mr *MockTpsLimiterMockRecorder) IsAllowable(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAllowable", reflect.TypeOf((*MockTpsLimiter)(nil).IsAllowable), arg0, arg1) +} diff --git a/filter/impl/tps/rejected_execution_handler.go b/filter/impl/tps/rejected_execution_handler.go new file mode 100644 index 0000000000..827908974e --- /dev/null +++ b/filter/impl/tps/rejected_execution_handler.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package tps + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +/** + * This implementation only logs the invocation info. + * it always return en error inside the result. + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.rejected.handler: "name of handler" + * methods: + * - name: "GetUser" + */ +type RejectedExecutionHandler interface { + RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result +} diff --git a/filter/impl/tps/tps_limit_strategy.go b/filter/impl/tps/tps_limit_strategy.go new file mode 100644 index 0000000000..d1af85b464 --- /dev/null +++ b/filter/impl/tps/tps_limit_strategy.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tps + +/* + * please register your implementation by invoking SetTpsLimitStrategy + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service" # the name of limiter + * tps.limit.strategy: "name of implementation" # service-level + * methods: + * - name: "GetUser" + * tps.interval: 3000 + * tps.limit.strategy: "name of implementation" # method-level + */ +type TpsLimitStrategy interface { + IsAllowable() bool +} diff --git a/filter/impl/tps/tps_limiter.go b/filter/impl/tps/tps_limiter.go new file mode 100644 index 0000000000..0622a957a8 --- /dev/null +++ b/filter/impl/tps/tps_limiter.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tps + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +/* + * please register your implementation by invoking SetTpsLimiter + * The usage, for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "the name of limiter", + */ +type TpsLimiter interface { + IsAllowable(common.URL, protocol.Invocation) bool +} diff --git a/filter/impl/tps_limit_filter.go b/filter/impl/tps_limit_filter.go new file mode 100644 index 0000000000..1e3222c0e8 --- /dev/null +++ b/filter/impl/tps_limit_filter.go @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + _ "github.com/apache/dubbo-go/filter/impl/tps/impl" + "github.com/apache/dubbo-go/protocol" +) + +const ( + TpsLimitFilterKey = "tps" +) + +func init() { + extension.SetFilter(TpsLimitFilterKey, GetTpsLimitFilter) +} + +/** + * if you wish to use the TpsLimiter, please add the configuration into your service provider configuration: + * for example: + * "UserProvider": + * registry: "hangzhouzk" + * protocol : "dubbo" + * interface : "com.ikurento.user.UserProvider" + * ... # other configuration + * tps.limiter: "method-service", # it should be the name of limiter. if the value is 'default', + * # the MethodServiceTpsLimiterImpl will be used. + * tps.limit.rejected.handler: "default", # optional, or the name of the implementation + * if the value of 'tps.limiter' is nil or empty string, the tps filter will do nothing + */ +type TpsLimitFilter struct { +} + +func (t TpsLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + url := invoker.GetUrl() + tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") + rejectedExeHandler := url.GetParam(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY) + if len(tpsLimiter) > 0 { + allow := extension.GetTpsLimiter(tpsLimiter).IsAllowable(invoker.GetUrl(), invocation) + if allow { + return invoker.Invoke(invocation) + } + return extension.GetTpsRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation) + } + return invoker.Invoke(invocation) +} + +func (t TpsLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +func GetTpsLimitFilter() filter.Filter { + return &TpsLimitFilter{} +} diff --git a/filter/impl/tps_limit_filter_test.go b/filter/impl/tps_limit_filter_test.go new file mode 100644 index 0000000000..5f5557b07e --- /dev/null +++ b/filter/impl/tps_limit_filter_test.go @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "net/url" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter/impl/tps/impl" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) { + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, "")) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + +} + +func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1) + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + return mockLimiter + }) + + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) +} + +func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1) + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + return mockLimiter + }) + + mockResult := &protocol.RPCResult{} + mockRejectedHandler := impl.NewMockRejectedExecutionHandler(ctrl) + mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1) + + extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, func() tps.RejectedExecutionHandler { + return mockRejectedHandler + }) + + tpsFilter := GetTpsLimitFilter() + invokeUrl := common.NewURLWithOptions( + common.WithParams(url.Values{}), + common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) + attch := make(map[string]string, 0) + + result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) +} diff --git a/go.mod b/go.mod index be1c80bd17..2f7a2820ba 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,11 @@ module github.com/apache/dubbo-go require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5 + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -31,14 +34,16 @@ require ( github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 // indirect + github.com/prometheus/common v0.6.0 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect github.com/soheilhy/cmux v0.1.4 // indirect - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.4.0 github.com/tebeka/strftime v0.1.3 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect diff --git a/go.sum b/go.sum index 19b148133b..3ae72390a6 100644 --- a/go.sum +++ b/go.sum @@ -31,7 +31,11 @@ github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJM github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= @@ -446,6 +450,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= @@ -534,6 +540,7 @@ google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d h1:TxyelI5cVkbREznMhfzycHdkp5cLA7DpE+GKjSslYhM= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw=