Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: ExecuteLimit Support #246

Merged
merged 6 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ Finished List:
- Dynamic Configure Center & Service Management Configurator: Zookeeper
- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)/[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246)
- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group

Working List:

- Load Balance: ConsistentHash
- Filter: CountFilter/ExecuteLimitFilter
- Registry: k8s
- Configure Center: apollo
- Metadata Center (dubbo v2.7.x)
Expand Down
3 changes: 1 addition & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ Apache License, Version 2.0
- 动态配置中心与服务治理配置器(config center): Zookeeper
- 集群策略: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- 负载均衡策略: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246)
- 其他功能支持: [泛化调用](https://github.com/apache/dubbo-go/pull/122)/启动时检查/服务直连/多服务协议/多注册中心/多服务版本/服务分组

开发中列表:

- 集群策略: Forking
- 负载均衡策略: ConsistentHash
- 过滤器: CountFilter/ExecuteLimitFilter
flycash marked this conversation as resolved.
Show resolved Hide resolved
- 注册中心: k8s
- 配置中心: apollo
- 元数据中心 (dubbo v2.7.x)
Expand Down
2 changes: 1 addition & 1 deletion common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps"
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute"
fangyincheng marked this conversation as resolved.
Show resolved Hide resolved
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
Expand Down
43 changes: 23 additions & 20 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,29 @@ const (
)

const (
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
EXECUTE_LIMIT_KEY = "execute.limit"
DEFAULT_EXECUTE_LIMIT = "-1"
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
)

const (
Expand Down
17 changes: 16 additions & 1 deletion common/extension/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package extension

import (
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
)

var (
filters = make(map[string]func() filter.Filter)
filters = make(map[string]func() filter.Filter)
rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler)
)

func SetFilter(name string, v func() filter.Filter) {
Expand All @@ -35,3 +37,16 @@ func GetFilter(name string) filter.Filter {
}
return filters[name]()
}

func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) {
rejectedExecutionHandler[name] = creator
}

func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler {
creator, ok := rejectedExecutionHandler[name]
if !ok {
panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetRejectedExecutionHandler.")
}
return creator()
}
18 changes: 2 additions & 16 deletions common/extension/tps_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
)

var (
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() tps.RejectedExecutionHandler)
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
)

func SetTpsLimiter(name string, creator func() tps.TpsLimiter) {
Expand Down Expand Up @@ -52,16 +51,3 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.Tp
}
return creator
}

func SetTpsRejectedExecutionHandler(name string, creator func() tps.RejectedExecutionHandler) {
tpsRejectedExecutionHandler[name] = creator
}

func GetTpsRejectedExecutionHandler(name string) tps.RejectedExecutionHandler {
creator, ok := tpsRejectedExecutionHandler[name]
if !ok {
panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsRejectedExecutionHandler.")
}
return creator()
}
20 changes: 11 additions & 9 deletions config/method_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ import (
)

type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
}

func (c *MethodConfig) Prefix() string {
Expand Down
63 changes: 37 additions & 26 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,35 @@ import (
)

type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`

unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
}

func (c *ServiceConfig) Prefix() string {
Expand Down Expand Up @@ -203,6 +206,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler)

// execute limit filter
urlMap.Set(constant.EXECUTE_LIMIT_KEY, srvconfig.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.ExecuteLimitRejectedHandler)

for _, v := range srvconfig.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance)
Expand All @@ -212,6 +219,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)

urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)

}

return urlMap
Expand Down
10 changes: 10 additions & 0 deletions config/testdata/provider_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ services:
tps.limit.strategy: "slidingWindow"
# the name of RejectedExecutionHandler
tps.limit.rejected.handler: "default"
# the concurrent request limitation of this service
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"
protocol : "dubbo"
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
Expand All @@ -50,6 +55,11 @@ services:
- name: "GetUser"
retries: 1
loadbalance: "random"
# the concurrent request limitation of this method
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"

protocols:
"dubbo":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter/impl/tps"
filterCommon "github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol"
)

const HandlerName = "log"

func init() {
extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
}

var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler
Expand All @@ -57,11 +57,11 @@ type OnlyLogRejectedExecutionHandler struct {
}

func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result {
logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
logger.Errorf("The invocation was rejected. url: %s", url.String())
return &protocol.RPCResult{}
}

func GetOnlyLogRejectedExecutionHandler() tps.RejectedExecutionHandler {
func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tps
package common

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.rejected.handler: "name of handler"
* methods:
* - name: "GetUser"
* If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter,
* the implementation will be used.
* The common case is that sometimes you want to return the default value when the request was rejected.
* Or you want to be warned if any request was rejected.
* In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler.
*/
type RejectedExecutionHandler interface {
RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result
Expand Down
Loading