Skip to content

Commit

Permalink
feat(statsd): add more business metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Jun 10, 2022
1 parent 8f43ee0 commit 3a6f1e2
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func runServer(config Config) error {
if err != nil {
return fmt.Errorf("failed to create new user repository: %w", err)
}
userService := user.NewService(logger, userRepository)
userService := user.NewService(logger, userRepository, user.ServiceWithStatsDReporter(statsdReporter))

assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0, config.Service.Identity.ProviderDefaultName)
if err != nil {
Expand Down
38 changes: 34 additions & 4 deletions core/user/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"

"github.com/odpf/compass/pkg/statsd"
"github.com/odpf/salt/log"
)

// Service is a type of service that manages business process
type Service struct {
repository Repository
logger log.Logger
statsdReporter *statsd.Reporter
repository Repository
logger log.Logger
}

// ValidateUser checks if user uuid is already in DB
Expand All @@ -23,8 +25,14 @@ func (s *Service) ValidateUser(ctx context.Context, uuid, email string) (string,
usr, err := s.repository.GetByUUID(ctx, uuid)
if err == nil {
if usr.ID != "" {
s.statsdReporter.Incr("user_stats").
Tag("info", "existing").
Publish()
return usr.ID, nil
}
s.statsdReporter.Incr("user_stats").
Tag("info", "error").
Publish()
err := errors.New("fetched user uuid from DB is empty")
s.logger.Error(err.Error())
return "", err
Expand All @@ -35,16 +43,38 @@ func (s *Service) ValidateUser(ctx context.Context, uuid, email string) (string,
Email: email,
})
if err != nil {
s.statsdReporter.Incr("user_stats").
Tag("info", "error").
Publish()
s.logger.Error("error when UpsertByEmail in ValidateUser service", "err", err.Error())
return "", err
}
s.statsdReporter.Incr("user_stats").
Tag("info", "new").
Publish()
return uid, nil
}

// NewService initializes user service
func NewService(logger log.Logger, repository Repository) *Service {
return &Service{
func NewService(logger log.Logger, repository Repository, opts ...func(*Service)) *Service {
s := &Service{
repository: repository,
logger: logger,
}

for _, opt := range opts {
opt(s)
}

if s.statsdReporter == nil {
s.statsdReporter = &statsd.Reporter{}
}

return s
}

func ServiceWithStatsDReporter(statsdReporter *statsd.Reporter) func(*Service) {
return func(s *Service) {
s.statsdReporter = statsdReporter
}
}
28 changes: 14 additions & 14 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,20 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
defer func() {
if server.statsDReporter != nil {
server.statsDReporter.Incr("discovery_error").
Tag("method", "upsert").
Publish()
}
}()
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "upsert",
})
}
return nil, internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric("asset_upsert",
map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
})

return &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}, nil
Expand All @@ -309,13 +312,10 @@ func (server *APIServer) DeleteAsset(ctx context.Context, req *compassv1beta1.De
return nil, status.Error(codes.NotFound, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
defer func() {
if server.statsDReporter != nil {
server.statsDReporter.Incr("discovery_error").
Tag("method", "delete").
Publish()
}
}()
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "delete",
})
}
return nil, internalServerError(server.logger, err.Error())
}
Expand Down
10 changes: 10 additions & 0 deletions internal/server/v1beta1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ func (server *APIServer) validateUserInCtx(ctx context.Context) (string, error)
return userID, nil
}

func (server *APIServer) sendStatsDCounterMetric(metricName string, kvTags map[string]string) {
if server.statsDReporter != nil {
metric := server.statsDReporter.Incr(metricName)
for k, v := range kvTags {
metric.Tag(k, v)
}
metric.Publish()
}
}

func internalServerError(logger log.Logger, msg string) error {
ref := time.Now().Unix()

Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc_interceptor/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestStatsDSuite(t *testing.T) {
}

func (s *StatsDTestSuite) TestUnary_StatsDMetrics() {
s.statsdClient.EXPECT().Histogram("/mwitkow.testproto.TestService/Ping", "OK").Once()
s.statsdClient.EXPECT().Histogram("responseTime", float64(0)).Return(nil).Once()
_, err := s.Client.Ping(context.Background(), &pb_testproto.PingRequest{Value: "something", SleepTimeMs: 9999})
code := status.Code(err)
require.Equal(s.T(), codes.OK, code)
s.statsdClient.AssertExpectations(s.T())
s.statsdClient.AssertCalled(s.T(), "Histogram", "responseTime", float64(0))
}
12 changes: 6 additions & 6 deletions pkg/statsd/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ type Metric struct {

// Success tags the metric as successful.
func (m *Metric) Success() *Metric {
if m == nil {
return m
}
m.Tag("success", "true")
return m
}

// Failure tags the metric as failure.
func (m *Metric) Failure(err error) *Metric {
if m == nil {
return m
}
m.Tag("success", "false")
return m
}

// Success tags the metric as invalid.
func (m *Metric) Invalid() *Metric {
m.Tag("invalid", "true")
return m
}

// Tag adds a tag to the metric.
func (m *Metric) Tag(key string, val string) *Metric {
if m == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Init(logger log.Logger, cfg Config) (*Reporter, error) {
if err != nil {
return nil, err
}

reporter.client = client
reporter.logger = logger
reporter.config = cfg
Expand Down

0 comments on commit 3a6f1e2

Please sign in to comment.