Skip to content

Commit

Permalink
Use throttled logger and use error type in metrics client (#2477)
Browse files Browse the repository at this point in the history
* Use throttled logger and use error type in metrics client
  • Loading branch information
yux0 authored Feb 8, 2022
1 parent 5fcfe25 commit cdb4fa0
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 22 deletions.
8 changes: 4 additions & 4 deletions client/clientFactory_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
dc *dynamicconfig.Collection,
numberOfHistoryShards int32,
logger log.Logger,
throttledLogger log.Logger,
) Factory
}

Expand All @@ -85,6 +86,7 @@ type (
dynConfig *dynamicconfig.Collection
numberOfHistoryShards int32
logger log.Logger
throttledLogger log.Logger
}

factoryProviderImpl struct {
Expand All @@ -111,6 +113,7 @@ func (p *factoryProviderImpl) NewFactory(
dc *dynamicconfig.Collection,
numberOfHistoryShards int32,
logger log.Logger,
throttledLogger log.Logger,
) Factory {
return &rpcClientFactory{
rpcFactory: rpcFactory,
Expand All @@ -119,6 +122,7 @@ func (p *factoryProviderImpl) NewFactory(
dynConfig: dc,
numberOfHistoryShards: numberOfHistoryShards,
logger: logger,
throttledLogger: throttledLogger,
}
}

Expand Down Expand Up @@ -148,7 +152,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
clientCache := common.NewClientCache(keyResolver, clientProvider)
client := history.NewClient(cf.numberOfHistoryShards, timeout, clientCache, cf.logger)
if cf.metricsClient != nil {
client = history.NewMetricClient(client, cf.metricsClient, cf.logger)
client = history.NewMetricClient(client, cf.metricsClient, cf.logger, cf.throttledLogger)
}
return client, nil
}
Expand Down Expand Up @@ -176,7 +180,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
)

if cf.metricsClient != nil {
client = matching.NewMetricClient(client, cf.metricsClient, cf.logger)
client = matching.NewMetricClient(client, cf.metricsClient, cf.logger, cf.throttledLogger)
}
return client, nil

Expand Down
19 changes: 11 additions & 8 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,24 @@ import (
var _ historyservice.HistoryServiceClient = (*metricClient)(nil)

type metricClient struct {
client historyservice.HistoryServiceClient
metricsClient metrics.Client
logger log.Logger
client historyservice.HistoryServiceClient
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
}

// NewMetricClient creates a new instance of historyservice.HistoryServiceClient that emits metrics
func NewMetricClient(
client historyservice.HistoryServiceClient,
metricsClient metrics.Client,
logger log.Logger,
throttledLogger log.Logger,
) historyservice.HistoryServiceClient {
return &metricClient{
client: client,
metricsClient: metricsClient,
logger: logger,
client: client,
metricsClient: metricsClient,
logger: logger,
throttledLogger: throttledLogger,
}
}

Expand Down Expand Up @@ -614,8 +617,8 @@ func (c *metricClient) finishMetricsRecording(
err error,
) {
if err != nil {
c.logger.Error("history client encountered error", tag.Error(err))
scope.IncCounter(metrics.ClientFailures)
c.throttledLogger.Error("history client encountered error", tag.Error(err))
scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures)
}
stopwatch.Stop()
}
19 changes: 11 additions & 8 deletions client/matching/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,24 @@ import (
var _ matchingservice.MatchingServiceClient = (*metricClient)(nil)

type metricClient struct {
client matchingservice.MatchingServiceClient
metricsClient metrics.Client
logger log.Logger
client matchingservice.MatchingServiceClient
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
}

// NewMetricClient creates a new instance of matchingservice.MatchingServiceClient that emits metrics
func NewMetricClient(
client matchingservice.MatchingServiceClient,
metricsClient metrics.Client,
logger log.Logger,
throttledLogger log.Logger,
) matchingservice.MatchingServiceClient {
return &metricClient{
client: client,
metricsClient: metricsClient,
logger: logger,
client: client,
metricsClient: metricsClient,
logger: logger,
throttledLogger: throttledLogger,
}
}

Expand Down Expand Up @@ -253,8 +256,8 @@ func (c *metricClient) finishMetricsRecording(
err error,
) {
if err != nil {
c.logger.Error("matching client encountered error", tag.Error(err))
scope.IncCounter(metrics.ClientFailures)
c.throttledLogger.Error("matching client encountered error", tag.Error(err))
scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures)
}
stopwatch.Stop()
}
18 changes: 18 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
TaskTypeTagName = "task_type"
QueueTypeTagName = "queue_type"
visibilityTypeTagName = "visibility_type"
ErrorTypeTagName = "error_type"
httpStatusTagName = "http_status"
resourceExhaustedTag = "resource_exhausted_cause"
)
Expand Down Expand Up @@ -1716,6 +1717,23 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
},
}

// Common error type
const (
ErrorTypeUnknown = "unknown_error_type"
ErrorTypeInvalidArgument = "invalid_argument_error_type"
ErrorTypeInternal = "internal_error_type"
ErrorTypeUnavailable = "unavailable_error_type"
ErrorTypeCanceled = "canceled_error_type"
ErrorTypeTimedOut = "timed_out_error_type"
ErrorTypeNotFound = "not_found_error_type"
ErrorTypeNamespaceNotActive = "namespace_not_active_error_type"
ErrorTypeQueryFailed = "query_failed_error_type"
ErrorTypeClientVersionNotSupported = "client_version_not_supported_error_type"
ErrorTypeServerVersionNotSupported = "server_version_not_supported_error_type"
ErrorTypePermissionDenied = "permission_denied_error_type"
ErrorTypeResourceExhausted = "resource_exhausted_error_type"
)

// Common Metrics enum
const (
ServiceRequests = iota
Expand Down
32 changes: 32 additions & 0 deletions common/metrics/defs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"regexp"
"testing"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -109,3 +112,32 @@ func TestMetricDefs(t *testing.T) {
}
}
}

func TestGetServiceErrorType(t *testing.T) {
testData := []struct {
err error
expectedResult string
}{
{serviceerror.NewInvalidArgument(""), ErrorTypeInvalidArgument},
{serviceerror.NewCanceled(""), ErrorTypeCanceled},
{serviceerror.NewDataLoss(""), ErrorTypeInternal},
{serviceerror.NewInternal(""), ErrorTypeInternal},
{serviceerror.NewCancellationAlreadyRequested(""), ErrorTypeInvalidArgument},
{serviceerror.NewNamespaceAlreadyExists(""), ErrorTypeInvalidArgument},
{serviceerror.NewWorkflowExecutionAlreadyStarted("", "", ""), ErrorTypeInvalidArgument},
{serviceerror.NewClientVersionNotSupported("", "", ""), ErrorTypeClientVersionNotSupported},
{serviceerror.NewServerVersionNotSupported("", ""), ErrorTypeServerVersionNotSupported},
{serviceerror.NewDeadlineExceeded(""), ErrorTypeTimedOut},
{serviceerror.NewNamespaceNotActive("", "", ""), ErrorTypeNamespaceNotActive},
{serviceerror.NewNotFound(""), ErrorTypeNotFound},
{serviceerror.NewPermissionDenied("", ""), ErrorTypePermissionDenied},
{serviceerror.NewQueryFailed(""), ErrorTypeQueryFailed},
{serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_UNSPECIFIED, ""), ErrorTypeResourceExhausted},
{serviceerror.NewUnavailable(""), ErrorTypeUnavailable},
{serviceerror.NewUnimplemented(""), ErrorTypeUnknown},
}

for id, data := range testData {
assert.Equal(t, data.expectedResult, getErrorType(data.err), "Unexpected error type in index", id)
}
}
40 changes: 40 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package metrics
import (
"strconv"

"go.temporal.io/api/serviceerror"

enumspb "go.temporal.io/api/enums/v1"
)

Expand Down Expand Up @@ -199,6 +201,10 @@ func VisibilityTypeTag(value string) Tag {
return &tagImpl{key: visibilityTypeTagName, value: value}
}

func ServiceErrorTypeTag(err error) Tag {
return &tagImpl{key: ErrorTypeTagName, value: getErrorType(err)}
}

var standardVisibilityTypeTag = VisibilityTypeTag(standardVisibilityTagValue)
var advancedVisibilityTypeTag = VisibilityTypeTag(advancedVisibilityTagValue)

Expand All @@ -218,3 +224,37 @@ func HttpStatusTag(value int) Tag {
func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag {
return &tagImpl{key: resourceExhaustedTag, value: cause.String()}
}

func getErrorType(err error) string {
switch err.(type) {
case *serviceerror.InvalidArgument,
*serviceerror.CancellationAlreadyRequested,
*serviceerror.NamespaceAlreadyExists,
*serviceerror.WorkflowExecutionAlreadyStarted:
return ErrorTypeInvalidArgument
case *serviceerror.Internal, *serviceerror.DataLoss:
return ErrorTypeInternal
case *serviceerror.Unavailable:
return ErrorTypeUnavailable
case *serviceerror.NotFound:
return ErrorTypeNotFound
case *serviceerror.Canceled:
return ErrorTypeCanceled
case *serviceerror.DeadlineExceeded:
return ErrorTypeTimedOut
case *serviceerror.NamespaceNotActive:
return ErrorTypeNamespaceNotActive
case *serviceerror.QueryFailed:
return ErrorTypeQueryFailed
case *serviceerror.ClientVersionNotSupported:
return ErrorTypeClientVersionNotSupported
case *serviceerror.ServerVersionNotSupported:
return ErrorTypeServerVersionNotSupported
case *serviceerror.PermissionDenied:
return ErrorTypePermissionDenied
case *serviceerror.ResourceExhausted:
return ErrorTypeResourceExhausted
default:
return ErrorTypeUnknown
}
}
2 changes: 2 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func ClientFactoryProvider(
dynamicCollection *dynamicconfig.Collection,
persistenceConfig *config.Persistence,
logger SnTaggedLogger,
throttledLogger ThrottledLogger,
) client.Factory {
return factoryProvider.NewFactory(
rpcFactory,
Expand All @@ -216,6 +217,7 @@ func ClientFactoryProvider(
dynamicCollection,
persistenceConfig.NumHistoryShards,
logger,
throttledLogger,
)
}

Expand Down

0 comments on commit cdb4fa0

Please sign in to comment.