Skip to content

Commit

Permalink
[fix #1590] filter single instance (#1591)
Browse files Browse the repository at this point in the history
* filter single instance

* filter single instance

Co-authored-by: wangxiaowei14227 <wangxiaowei14227@autohome.com.cn>
  • Loading branch information
XiaoWeiKIN and wangxiaowei14227 committed Nov 19, 2021
1 parent 5c115cb commit df9f3f9
Show file tree
Hide file tree
Showing 29 changed files with 322 additions and 151 deletions.
30 changes: 20 additions & 10 deletions filter/accesslog/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -52,6 +53,11 @@ const (
Arguments = "arguments"
)

var (
once sync.Once
accessLogFilter *Filter
)

func init() {
extension.SetFilter(constant.AccessLogFilterKey, newFilter)
}
Expand All @@ -76,6 +82,20 @@ type Filter struct {
logChan chan Data
}

func newFilter() filter.Filter {
if accessLogFilter == nil {
once.Do(func() {
accessLogFilter = &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
})
}
return accessLogFilter
}

// Invoke will check whether user wants to use this filter.
// If we find the value of key constant.AccessLogFilterKey, we will log the invocation info
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
Expand Down Expand Up @@ -220,16 +240,6 @@ func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}

func newFilter() filter.Filter {
accessLogFilter := &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
return accessLogFilter
}

// Data defines the data that will be log into file
type Data struct {
accessLog string
Expand Down
26 changes: 19 additions & 7 deletions filter/active/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package active
import (
"context"
"strconv"
"sync"
)

import (
Expand All @@ -35,25 +36,36 @@ const (
dubboInvokeStartTime = "dubboInvokeStartTime"
)

var (
once sync.Once
active *activeFilter
)

func init() {
extension.SetFilter(constant.ActiveFilterKey, func() filter.Filter {
return &Filter{}
})
extension.SetFilter(constant.ActiveFilterKey, newActiveFilter)
}

// Filter tracks the requests status
type Filter struct{}
type activeFilter struct{}

func newActiveFilter() filter.Filter {
if active == nil {
once.Do(func() {
active = &activeFilter{}
})
}
return active
}

// Invoke starts to record the requests status
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
func (f *activeFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}

// OnResponse update the active count base on the request result.
func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (f *activeFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
Expand Down
4 changes: 2 additions & 2 deletions filter/active/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
func TestFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}))
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := Filter{}
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand All @@ -57,7 +57,7 @@ func TestFilterOnResponse(t *testing.T) {
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := Filter{}
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand Down
25 changes: 19 additions & 6 deletions filter/auth/accesskey_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"sync"
)

var (
storageOnce sync.Once
storage *defaultAccesskeyStorage
)

func init() {
extension.SetAccessKeyStorages(constant.DefaultAccessKeyStorage, func() filter.AccessKeyStorage {
return &DefaultAccesskeyStorage{}
})
extension.SetAccessKeyStorages(constant.DefaultAccessKeyStorage, newDefaultAccesskeyStorage)
}

// DefaultAccesskeyStorage is the default implementation of AccesskeyStorage
type DefaultAccesskeyStorage struct{}
// defaultAccesskeyStorage is the default implementation of AccesskeyStorage
type defaultAccesskeyStorage struct{}

func newDefaultAccesskeyStorage() filter.AccessKeyStorage {
if storage == nil {
storageOnce.Do(func() {
storage = &defaultAccesskeyStorage{}
})
}
return storage
}

// GetAccessKeyPair retrieves AccessKeyPair from url by the key "accessKeyId" and "secretAccessKey"
func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair {
func (storage *defaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair {
return &filter.AccessKeyPair{
AccessKey: url.GetParam(constant.AccessKeyIDKey, ""),
SecretKey: url.GetParam(constant.SecretAccessKeyKey, ""),
Expand Down
2 changes: 1 addition & 1 deletion filter/auth/accesskey_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDefaultAccesskeyStorage_GetAccesskeyPair(t *testing.T) {
common.WithParamsValue(constant.SecretAccessKeyKey, "skey"),
common.WithParamsValue(constant.AccessKeyIDKey, "akey"))
invocation := &invocation2.RPCInvocation{}
storage := &DefaultAccesskeyStorage{}
storage = &defaultAccesskeyStorage{}
accesskeyPair := storage.GetAccessKeyPair(invocation, url)
assert.Equal(t, "skey", accesskeyPair.SecretKey)
assert.Equal(t, "akey", accesskeyPair.AccessKey)
Expand Down
32 changes: 20 additions & 12 deletions filter/auth/consumer_sign_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,39 @@ package auth
import (
"context"
"fmt"
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
signOnce sync.Once
sign *signFilter
)

func init() {
extension.SetFilter(constant.AuthConsumerFilterKey, func() filter.Filter {
return &ConsumerSignFilter{}
})
extension.SetFilter(constant.AuthProviderFilterKey, func() filter.Filter {
return &ProviderAuthFilter{}
})
extension.SetFilter(constant.AuthConsumerFilterKey, newSignFilter)
}

// ConsumerSignFilter signs the request on consumer side
type ConsumerSignFilter struct{}
// signFilter signs the request on consumer side
type signFilter struct{}

func newSignFilter() filter.Filter {
if sign == nil {
signOnce.Do(func() {
sign = &signFilter{}
})
}
return sign
}

// Invoke retrieves the configured Authenticator to add signature to invocation
func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking ConsumerSign filter.")
func (sf *signFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
url := invoker.GetURL()

err := doAuthWork(url, func(authenticator filter.Authenticator) error {
Expand All @@ -57,6 +65,6 @@ func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invo
}

// OnResponse dummy process, returns the result directly
func (csf *ConsumerSignFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (sf *signFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
2 changes: 1 addition & 1 deletion filter/auth/consumer_sign_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConsumerSignFilter_Invoke(t *testing.T) {
url.SetParam(constant.SecretAccessKeyKey, "sk")
url.SetParam(constant.AccessKeyIDKey, "ak")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
filter := &ConsumerSignFilter{}
filter := &signFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand Down
27 changes: 20 additions & 7 deletions filter/auth/default_authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"
)

Expand All @@ -33,17 +34,29 @@ import (
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

var (
authenticatorOnce sync.Once
authenticator *defaultAuthenticator
)

func init() {
extension.SetAuthenticator(constant.DefaultAuthenticator, func() filter.Authenticator {
return &DefaultAuthenticator{}
})
extension.SetAuthenticator(constant.DefaultAuthenticator, newDefaultAuthenticator)
}

// DefaultAuthenticator is the default implementation of Authenticator
type DefaultAuthenticator struct{}
// defaultAuthenticator is the default implementation of Authenticator
type defaultAuthenticator struct{}

func newDefaultAuthenticator() filter.Authenticator {
if authenticator == nil {
authenticatorOnce.Do(func() {
authenticator = &defaultAuthenticator{}
})
}
return authenticator
}

// Sign adds the signature to the invocation
func (authenticator *DefaultAuthenticator) Sign(invocation protocol.Invocation, url *common.URL) error {
func (authenticator *defaultAuthenticator) Sign(invocation protocol.Invocation, url *common.URL) error {
currentTimeMillis := strconv.Itoa(int(time.Now().Unix() * 1000))

consumer := url.GetParam(constant.ApplicationKey, "")
Expand Down Expand Up @@ -83,7 +96,7 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st
}

// Authenticate verifies whether the signature sent by the requester is correct
func (authenticator *DefaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
func (authenticator *defaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
accessKeyId := invocation.AttachmentsByKey(constant.AKKey, "")

requestTimestamp := invocation.AttachmentsByKey(constant.RequestTimestampKey, "")
Expand Down
4 changes: 2 additions & 2 deletions filter/auth/default_authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
requestTime := strconv.Itoa(int(time.Now().Unix() * 1000))
signature, _ := getSignature(testurl, inv, secret, requestTime)

authenticator := &DefaultAuthenticator{}
authenticator = &defaultAuthenticator{}

invcation := invocation.NewRPCInvocation("test", parmas, map[string]interface{}{
constant.RequestSignatureKey: signature,
Expand All @@ -72,7 +72,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
}

func TestDefaultAuthenticator_Sign(t *testing.T) {
authenticator := &DefaultAuthenticator{}
authenticator = &defaultAuthenticator{}
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?application=test&interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.AccessKeyIDKey, "akey")
testurl.SetParam(constant.SecretAccessKeyKey, "skey")
Expand Down
30 changes: 22 additions & 8 deletions filter/auth/provider_auth_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth

import (
"context"
"sync"
)

import (
Expand All @@ -29,23 +30,36 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
authOnce sync.Once
auth *authFilter
)

func init() {
extension.SetFilter(constant.AuthProviderFilterKey, newProviderAuthFilter)
extension.SetFilter(constant.AuthProviderFilterKey, newAuthFilter)
}

// ProviderAuthFilter verifies the correctness of the signature on provider side
type ProviderAuthFilter struct{}
// authFilter verifies the correctness of the signature on provider side
type authFilter struct{}

func newAuthFilter() filter.Filter {
if auth == nil {
authOnce.Do(func() {
auth = &authFilter{}
})
}
return auth
}

// Invoke retrieves the configured Authenticator to verify the signature in an invocation
func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking providerAuth filter.")
func (paf *authFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
url := invoker.GetURL()

err := doAuthWork(url, func(authenticator filter.Authenticator) error {
return authenticator.Authenticate(invocation, url)
})
if err != nil {
logger.Infof("auth the request: %v occur exception, cause: %s", invocation, err.Error())
logger.Errorf("auth the request: %v occur exception, cause: %s", invocation, err.Error())
return &protocol.RPCResult{
Err: err,
}
Expand All @@ -55,10 +69,10 @@ func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invo
}

// OnResponse dummy process, returns the result directly
func (paf *ProviderAuthFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (paf *authFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

func newProviderAuthFilter() filter.Filter {
return &ProviderAuthFilter{}
return &authFilter{}
}
2 changes: 1 addition & 1 deletion filter/auth/provider_auth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestProviderAuthFilter_Invoke(t *testing.T) {
constant.AKKey: access,
})
ctrl := gomock.NewController(t)
filter := &ProviderAuthFilter{}
filter := &authFilter{}
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
result := &protocol.RPCResult{}
Expand Down
Loading

0 comments on commit df9f3f9

Please sign in to comment.