Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #1590] filter single instance #1591

Merged
merged 2 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions filter/accesslog/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -52,6 +53,11 @@ const (
Arguments = "arguments"
)

var (
once sync.Once
accessLogFilter *Filter
)

func init() {
extension.SetFilter(constant.AccessLogFilterKey, newFilter)
}
Expand All @@ -76,6 +82,20 @@ type Filter struct {
logChan chan Data
}

func newFilter() filter.Filter {
if accessLogFilter == nil {
once.Do(func() {
accessLogFilter = &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
})
}
return accessLogFilter
}

// Invoke will check whether user wants to use this filter.
// If we find the value of key constant.AccessLogFilterKey, we will log the invocation info
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
Expand Down Expand Up @@ -220,16 +240,6 @@ func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}

func newFilter() filter.Filter {
accessLogFilter := &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
return accessLogFilter
}

// Data defines the data that will be log into file
type Data struct {
accessLog string
Expand Down
26 changes: 19 additions & 7 deletions filter/active/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package active
import (
"context"
"strconv"
"sync"
)

import (
Expand All @@ -35,25 +36,36 @@ const (
dubboInvokeStartTime = "dubboInvokeStartTime"
)

var (
once sync.Once
active *activeFilter
)

func init() {
extension.SetFilter(constant.ActiveFilterKey, func() filter.Filter {
return &Filter{}
})
extension.SetFilter(constant.ActiveFilterKey, newActiveFilter)
}

// Filter tracks the requests status
type Filter struct{}
type activeFilter struct{}

func newActiveFilter() filter.Filter {
if active == nil {
once.Do(func() {
active = &activeFilter{}
})
}
return active
}

// Invoke starts to record the requests status
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
func (f *activeFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}

// OnResponse update the active count base on the request result.
func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (f *activeFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
Expand Down
4 changes: 2 additions & 2 deletions filter/active/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
func TestFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}))
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := Filter{}
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand All @@ -57,7 +57,7 @@ func TestFilterOnResponse(t *testing.T) {
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := Filter{}
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand Down
25 changes: 19 additions & 6 deletions filter/auth/accesskey_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"sync"
)

var (
storageOnce sync.Once
storage *defaultAccesskeyStorage
)

func init() {
extension.SetAccessKeyStorages(constant.DefaultAccessKeyStorage, func() filter.AccessKeyStorage {
return &DefaultAccesskeyStorage{}
})
extension.SetAccessKeyStorages(constant.DefaultAccessKeyStorage, newDefaultAccesskeyStorage)
}

// DefaultAccesskeyStorage is the default implementation of AccesskeyStorage
type DefaultAccesskeyStorage struct{}
// defaultAccesskeyStorage is the default implementation of AccesskeyStorage
type defaultAccesskeyStorage struct{}

func newDefaultAccesskeyStorage() filter.AccessKeyStorage {
if storage == nil {
storageOnce.Do(func() {
storage = &defaultAccesskeyStorage{}
})
}
return storage
}

// GetAccessKeyPair retrieves AccessKeyPair from url by the key "accessKeyId" and "secretAccessKey"
func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair {
func (storage *defaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair {
return &filter.AccessKeyPair{
AccessKey: url.GetParam(constant.AccessKeyIDKey, ""),
SecretKey: url.GetParam(constant.SecretAccessKeyKey, ""),
Expand Down
2 changes: 1 addition & 1 deletion filter/auth/accesskey_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDefaultAccesskeyStorage_GetAccesskeyPair(t *testing.T) {
common.WithParamsValue(constant.SecretAccessKeyKey, "skey"),
common.WithParamsValue(constant.AccessKeyIDKey, "akey"))
invocation := &invocation2.RPCInvocation{}
storage := &DefaultAccesskeyStorage{}
storage = &defaultAccesskeyStorage{}
accesskeyPair := storage.GetAccessKeyPair(invocation, url)
assert.Equal(t, "skey", accesskeyPair.SecretKey)
assert.Equal(t, "akey", accesskeyPair.AccessKey)
Expand Down
32 changes: 20 additions & 12 deletions filter/auth/consumer_sign_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,39 @@ package auth
import (
"context"
"fmt"
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
signOnce sync.Once
sign *signFilter
)

func init() {
extension.SetFilter(constant.AuthConsumerFilterKey, func() filter.Filter {
return &ConsumerSignFilter{}
})
extension.SetFilter(constant.AuthProviderFilterKey, func() filter.Filter {
return &ProviderAuthFilter{}
})
extension.SetFilter(constant.AuthConsumerFilterKey, newSignFilter)
}

// ConsumerSignFilter signs the request on consumer side
type ConsumerSignFilter struct{}
// signFilter signs the request on consumer side
type signFilter struct{}

func newSignFilter() filter.Filter {
if sign == nil {
signOnce.Do(func() {
sign = &signFilter{}
})
}
return sign
}

// Invoke retrieves the configured Authenticator to add signature to invocation
func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking ConsumerSign filter.")
func (sf *signFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
url := invoker.GetURL()

err := doAuthWork(url, func(authenticator filter.Authenticator) error {
Expand All @@ -57,6 +65,6 @@ func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invo
}

// OnResponse dummy process, returns the result directly
func (csf *ConsumerSignFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (sf *signFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
2 changes: 1 addition & 1 deletion filter/auth/consumer_sign_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConsumerSignFilter_Invoke(t *testing.T) {
url.SetParam(constant.SecretAccessKeyKey, "sk")
url.SetParam(constant.AccessKeyIDKey, "ak")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
filter := &ConsumerSignFilter{}
filter := &signFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand Down
27 changes: 20 additions & 7 deletions filter/auth/default_authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"
)

Expand All @@ -33,17 +34,29 @@ import (
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

var (
authenticatorOnce sync.Once
authenticator *defaultAuthenticator
)

func init() {
extension.SetAuthenticator(constant.DefaultAuthenticator, func() filter.Authenticator {
return &DefaultAuthenticator{}
})
extension.SetAuthenticator(constant.DefaultAuthenticator, newDefaultAuthenticator)
}

// DefaultAuthenticator is the default implementation of Authenticator
type DefaultAuthenticator struct{}
// defaultAuthenticator is the default implementation of Authenticator
type defaultAuthenticator struct{}

func newDefaultAuthenticator() filter.Authenticator {
if authenticator == nil {
authenticatorOnce.Do(func() {
authenticator = &defaultAuthenticator{}
})
}
return authenticator
}

// Sign adds the signature to the invocation
func (authenticator *DefaultAuthenticator) Sign(invocation protocol.Invocation, url *common.URL) error {
func (authenticator *defaultAuthenticator) Sign(invocation protocol.Invocation, url *common.URL) error {
currentTimeMillis := strconv.Itoa(int(time.Now().Unix() * 1000))

consumer := url.GetParam(constant.ApplicationKey, "")
Expand Down Expand Up @@ -83,7 +96,7 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st
}

// Authenticate verifies whether the signature sent by the requester is correct
func (authenticator *DefaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
func (authenticator *defaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
accessKeyId := invocation.AttachmentsByKey(constant.AKKey, "")

requestTimestamp := invocation.AttachmentsByKey(constant.RequestTimestampKey, "")
Expand Down
4 changes: 2 additions & 2 deletions filter/auth/default_authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
requestTime := strconv.Itoa(int(time.Now().Unix() * 1000))
signature, _ := getSignature(testurl, inv, secret, requestTime)

authenticator := &DefaultAuthenticator{}
authenticator = &defaultAuthenticator{}

invcation := invocation.NewRPCInvocation("test", parmas, map[string]interface{}{
constant.RequestSignatureKey: signature,
Expand All @@ -72,7 +72,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
}

func TestDefaultAuthenticator_Sign(t *testing.T) {
authenticator := &DefaultAuthenticator{}
authenticator = &defaultAuthenticator{}
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?application=test&interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.AccessKeyIDKey, "akey")
testurl.SetParam(constant.SecretAccessKeyKey, "skey")
Expand Down
30 changes: 22 additions & 8 deletions filter/auth/provider_auth_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth

import (
"context"
"sync"
)

import (
Expand All @@ -29,23 +30,36 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
authOnce sync.Once
auth *authFilter
)

func init() {
extension.SetFilter(constant.AuthProviderFilterKey, newProviderAuthFilter)
extension.SetFilter(constant.AuthProviderFilterKey, newAuthFilter)
}

// ProviderAuthFilter verifies the correctness of the signature on provider side
type ProviderAuthFilter struct{}
// authFilter verifies the correctness of the signature on provider side
type authFilter struct{}

func newAuthFilter() filter.Filter {
if auth == nil {
authOnce.Do(func() {
auth = &authFilter{}
})
}
return auth
}

// Invoke retrieves the configured Authenticator to verify the signature in an invocation
func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking providerAuth filter.")
func (paf *authFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
url := invoker.GetURL()

err := doAuthWork(url, func(authenticator filter.Authenticator) error {
return authenticator.Authenticate(invocation, url)
})
if err != nil {
logger.Infof("auth the request: %v occur exception, cause: %s", invocation, err.Error())
logger.Errorf("auth the request: %v occur exception, cause: %s", invocation, err.Error())
return &protocol.RPCResult{
Err: err,
}
Expand All @@ -55,10 +69,10 @@ func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invo
}

// OnResponse dummy process, returns the result directly
func (paf *ProviderAuthFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (paf *authFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

func newProviderAuthFilter() filter.Filter {
return &ProviderAuthFilter{}
return &authFilter{}
}
2 changes: 1 addition & 1 deletion filter/auth/provider_auth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestProviderAuthFilter_Invoke(t *testing.T) {
constant.AKKey: access,
})
ctrl := gomock.NewController(t)
filter := &ProviderAuthFilter{}
filter := &authFilter{}
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
result := &protocol.RPCResult{}
Expand Down
Loading