Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve metrics #149

Merged
merged 1 commit into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
esStore "github.com/odpf/compass/internal/store/elasticsearch"
"github.com/odpf/compass/internal/store/postgres"
"github.com/odpf/compass/pkg/metrics"
"github.com/odpf/compass/pkg/statsd"
"github.com/odpf/salt/config"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -37,7 +38,7 @@ type Config struct {
LogLevel string `mapstructure:"log_level" default:"info"`

// StatsD
StatsD metrics.StatsDConfig `mapstructure:"statsd"`
StatsD statsd.Config `mapstructure:"statsd"`

// NewRelic
NewRelic metrics.NewRelicConfig `mapstructure:"newrelic"`
Expand Down
26 changes: 8 additions & 18 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
compassserver "github.com/odpf/compass/internal/server"
esStore "github.com/odpf/compass/internal/store/elasticsearch"
"github.com/odpf/compass/internal/store/postgres"
"github.com/odpf/compass/pkg/metrics"
"github.com/odpf/compass/pkg/statsd"
"github.com/odpf/salt/log"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -62,12 +62,16 @@ func runServer(config Config) error {
if err != nil {
return err
}
statsdMonitor := initStatsDMonitor(config, logger)
statsdReporter, err := statsd.Init(logger, config.StatsD)
if err != nil {
return err
}

esClient, err := initElasticsearch(logger, config.Elasticsearch)
if err != nil {
return err
}

pgClient, err := initPostgres(logger, config)
if err != nil {
return err
Expand All @@ -90,7 +94,7 @@ func runServer(config Config) error {
if err != nil {
return fmt.Errorf("failed to create new user repository: %w", err)
}
userService := user.NewService(logger, userRepository)
userService := user.NewService(logger, userRepository, user.ServiceWithStatsDReporter(statsdReporter))

assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0, config.Service.Identity.ProviderDefaultName)
if err != nil {
Expand Down Expand Up @@ -123,7 +127,7 @@ func runServer(config Config) error {
logger,
pgClient,
nrApp,
statsdMonitor,
statsdReporter,
assetService,
starService,
discussionService,
Expand Down Expand Up @@ -180,17 +184,3 @@ func initNewRelicMonitor(config Config, logger log.Logger) (*newrelic.Applicatio

return app, nil
}

func initStatsDMonitor(config Config, logger log.Logger) *metrics.StatsDMonitor {
var metricsMonitor *metrics.StatsDMonitor
if !config.StatsD.Enabled {
logger.Info("statsd metrics monitoring is disabled.")
return nil
}
metricsSeparator := "."
statsdClient := metrics.NewStatsDClient(config.StatsD.Address)
metricsMonitor = metrics.NewStatsDMonitor(statsdClient, config.StatsD.Prefix, metricsSeparator)
logger.Info("statsd metrics monitoring is enabled", "statsd address", config.StatsD.Address)

return metricsMonitor
}
8 changes: 8 additions & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ type InvalidError struct {
func (err InvalidError) Error() string {
return fmt.Sprintf("invalid asset id: %q", err.AssetID)
}

type DiscoveryError struct {
Err error
}

func (err DiscoveryError) Error() string {
return fmt.Sprintf("discovery error: %s", err.Err)
}
38 changes: 34 additions & 4 deletions core/user/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"

"github.com/odpf/compass/pkg/statsd"
"github.com/odpf/salt/log"
)

// Service is a type of service that manages business process
type Service struct {
repository Repository
logger log.Logger
statsdReporter *statsd.Reporter
repository Repository
logger log.Logger
}

// ValidateUser checks if user uuid is already in DB
Expand All @@ -23,8 +25,14 @@ func (s *Service) ValidateUser(ctx context.Context, uuid, email string) (string,
usr, err := s.repository.GetByUUID(ctx, uuid)
if err == nil {
if usr.ID != "" {
s.statsdReporter.Incr("user_stats").
Tag("info", "existing").
Publish()
return usr.ID, nil
}
s.statsdReporter.Incr("user_stats").
Tag("info", "error").
Publish()
err := errors.New("fetched user uuid from DB is empty")
s.logger.Error(err.Error())
return "", err
Expand All @@ -35,16 +43,38 @@ func (s *Service) ValidateUser(ctx context.Context, uuid, email string) (string,
Email: email,
})
if err != nil {
s.statsdReporter.Incr("user_stats").
Tag("info", "error").
Publish()
s.logger.Error("error when UpsertByEmail in ValidateUser service", "err", err.Error())
return "", err
}
s.statsdReporter.Incr("user_stats").
Tag("info", "new").
Publish()
return uid, nil
}

// NewService initializes user service
func NewService(logger log.Logger, repository Repository) *Service {
return &Service{
func NewService(logger log.Logger, repository Repository, opts ...func(*Service)) *Service {
s := &Service{
repository: repository,
logger: logger,
}

for _, opt := range opts {
opt(s)
}

if s.statsdReporter == nil {
s.statsdReporter = &statsd.Reporter{}
}

return s
}

func ServiceWithStatsDReporter(statsdReporter *statsd.Reporter) func(*Service) {
return func(s *Service) {
s.statsdReporter = statsdReporter
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/odpf/compass
go 1.16

require (
github.com/DataDog/datadog-go/v5 v5.1.1
github.com/MakeNowJust/heredoc v1.0.0
github.com/Masterminds/semver/v3 v3.1.1
github.com/Masterminds/squirrel v1.5.2
Expand All @@ -11,7 +12,6 @@ require (
github.com/elastic/go-elasticsearch v0.0.0
github.com/elastic/go-elasticsearch/v7 v7.16.0
github.com/envoyproxy/protoc-gen-validate v0.6.7
github.com/etsy/statsd v0.9.0
github.com/go-playground/locales v0.14.0
github.com/go-playground/universal-translator v0.18.0
github.com/go-playground/validator/v10 v10.10.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/DataDog/datadog-go v3.2.0+incompatible h1:qSG2N4FghB1He/r2mFrWKCaL7dXCilEuNEeAn20fdD4=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go/v5 v5.1.1 h1:JLZ6s2K1pG2h9GkvEvMdEGqMDyVLEAccdX5TltWcLMU=
github.com/DataDog/datadog-go/v5 v5.1.1/go.mod h1:KhiYb2Badlv9/rofz+OznKoEF5XKTonWyhx5K83AP8E=
github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ=
github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE=
github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc=
Expand All @@ -89,6 +92,7 @@ github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugX
github.com/Microsoft/go-winio v0.4.17-0.20210211115548-6eac466e5fa3/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17-0.20210324224401-5516f17a5958/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.1 h1:aPJp2QD7OOrhO5tQXqQoGSJc+DjDtWTGLOmNyAm6FgY=
github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg=
Expand Down Expand Up @@ -400,8 +404,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/envoyproxy/protoc-gen-validate v0.6.7 h1:qcZcULcd/abmQg6dwigimCNEyi4gg31M/xaciQlDml8=
github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo=
github.com/etsy/statsd v0.9.0 h1:GLP1pAzn1fGE7/kM2S5QXSU0ZTUV6QnZsyZVMx7IVF4=
github.com/etsy/statsd v0.9.0/go.mod h1:rmx2gVm1TEkQUIcU/KAM4prmC/AAUU8Wndeule9gvW4=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
Expand Down
6 changes: 3 additions & 3 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
handlersv1beta1 "github.com/odpf/compass/internal/server/v1beta1"
"github.com/odpf/compass/internal/store/postgres"
"github.com/odpf/compass/pkg/grpc_interceptor"
"github.com/odpf/compass/pkg/metrics"
"github.com/odpf/compass/pkg/statsd"
"github.com/odpf/salt/log"
"github.com/sirupsen/logrus"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -58,7 +58,7 @@ func Serve(
logger log.Logger,
pgClient *postgres.Client,
nrApp *newrelic.Application,
statsd *metrics.StatsDMonitor,
statsdReporter *statsd.Reporter,
assetService handlersv1beta1.AssetService,
starService handlersv1beta1.StarService,
discussionService handlersv1beta1.DiscussionService,
Expand Down Expand Up @@ -86,7 +86,7 @@ func Serve(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(logrus.New())), //TODO: expose *logrus.Logger in salt
nrgrpc.UnaryServerInterceptor(nrApp),
grpc_interceptor.StatsD(statsd),
grpc_interceptor.StatsD(statsdReporter),
grpc_interceptor.UserHeaderCtx(config.Identity.HeaderKeyUUID, config.Identity.HeaderKeyEmail),
)),
)
Expand Down
24 changes: 24 additions & 0 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import (
"github.com/odpf/compass/core/asset"
"github.com/odpf/compass/core/star"
"github.com/odpf/compass/core/user"
"github.com/odpf/compass/pkg/statsd"
"github.com/r3labs/diff/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

//go:generate mockery --name=StatsDClient -r --case underscore --with-expecter --structname StatsDClient --filename statsd_monitor.go --output=./mocks
type StatsDClient interface {
Incr(name string) *statsd.Metric
}

type AssetService interface {
GetAllAssets(context.Context, asset.Filter, bool) ([]asset.Asset, uint32, error)
GetAssetByID(ctx context.Context, id string) (asset.Asset, error)
Expand Down Expand Up @@ -249,9 +255,21 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
if errors.As(err, new(asset.InvalidError)) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "upsert",
})
}
return nil, internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric("asset_upsert",
map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
})

return &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}, nil
Expand All @@ -270,6 +288,12 @@ func (server *APIServer) DeleteAsset(ctx context.Context, req *compassv1beta1.De
if errors.As(err, new(asset.NotFoundError)) {
return nil, status.Error(codes.NotFound, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "delete",
})
}
return nil, internalServerError(server.logger, err.Error())
}

Expand Down
9 changes: 9 additions & 0 deletions internal/server/v1beta1/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package handlersv1beta1

type Option func(*APIServer)

func WithStatsD(st StatsDClient) Option {
return func(s *APIServer) {
s.statsDReporter = st
}
}
11 changes: 11 additions & 0 deletions internal/server/v1beta1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type APIServer struct {
tagTemplateService TagTemplateService
userService UserService
logger log.Logger
statsDReporter StatsDClient
}

var (
Expand Down Expand Up @@ -67,6 +68,16 @@ func (server *APIServer) validateUserInCtx(ctx context.Context) (string, error)
return userID, nil
}

func (server *APIServer) sendStatsDCounterMetric(metricName string, kvTags map[string]string) {
if server.statsDReporter != nil {
metric := server.statsDReporter.Incr(metricName)
for k, v := range kvTags {
metric.Tag(k, v)
}
metric.Publish()
}
}

func internalServerError(logger log.Logger, msg string) error {
ref := time.Now().Unix()

Expand Down
14 changes: 7 additions & 7 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,30 @@ func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) er

idxExists, err := repo.cli.indexExists(ctx, ast.Service)
if err != nil {
return elasticSearchError(err)
return asset.DiscoveryError{Err: err}
}

if !idxExists {
if err := repo.cli.CreateIdx(ctx, ast.Service); err != nil {
return err
return asset.DiscoveryError{Err: err}
}
}

body, err := repo.createUpsertBody(ast)
if err != nil {
return fmt.Errorf("error serialising payload: %w", err)
return asset.DiscoveryError{Err: fmt.Errorf("error serialising payload: %w", err)}
}
res, err := repo.cli.client.Bulk(
body,
repo.cli.client.Bulk.WithRefresh("true"),
repo.cli.client.Bulk.WithContext(ctx),
)
if err != nil {
return elasticSearchError(err)
return asset.DiscoveryError{Err: err}
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res))
return asset.DiscoveryError{Err: fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res))}
}
return nil
}
Expand All @@ -72,11 +72,11 @@ func (repo *DiscoveryRepository) Delete(ctx context.Context, assetID string) err
repo.cli.client.DeleteByQuery.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("error deleting asset: %w", err)
return asset.DiscoveryError{Err: fmt.Errorf("error deleting asset: %w", err)}
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res))
return asset.DiscoveryError{Err: fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res))}
}

return nil
Expand Down
Loading