From c7fb429c3031cffc70eecd9816128e3e690a8e00 Mon Sep 17 00:00:00 2001 From: itaiad200 Date: Wed, 26 Jun 2024 14:21:02 +0300 Subject: [PATCH] Metric for open in-use connections (#7913) * Add metric for open connections * Add metric for open connections * Add metric for open connections * Add metric for open connections * fix comments * fix comments * fix comments * fix comments * fix comments --- pkg/auth/service.go | 43 ++++++++++- pkg/auth/service_test.go | 2 +- pkg/block/adapter.go | 3 + pkg/block/factory/build.go | 9 +++ pkg/block/metrics.go | 133 +++++++++++++++++++++++++++++++++++ pkg/config/config_test.go | 16 ++++- pkg/httputil/client_trace.go | 29 ++++++++ pkg/kv/metrics.go | 6 ++ 8 files changed, 236 insertions(+), 5 deletions(-) create mode 100644 pkg/block/metrics.go create mode 100644 pkg/httputil/client_trace.go diff --git a/pkg/auth/service.go b/pkg/auth/service.go index d03869eb747..ebfc0941f08 100644 --- a/pkg/auth/service.go +++ b/pkg/auth/service.go @@ -1217,6 +1217,7 @@ type APIAuthService struct { } func (a *APIAuthService) InviteUser(ctx context.Context, email string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.CreateUserWithResponse(ctx, CreateUserJSONRequestBody{ Email: swag.String(email), Invite: swag.Bool(true), @@ -1238,6 +1239,7 @@ func (a *APIAuthService) Cache() Cache { } func (a *APIAuthService) CreateUser(ctx context.Context, user *model.User) (string, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.CreateUserWithResponse(ctx, CreateUserJSONRequestBody{ Email: user.Email, FriendlyName: user.FriendlyName, @@ -1257,6 +1259,7 @@ func (a *APIAuthService) CreateUser(ctx context.Context, user *model.User) (stri } func (a *APIAuthService) DeleteUser(ctx context.Context, username string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DeleteUserWithResponse(ctx, username) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to delete user") @@ -1309,6 +1312,7 @@ func (a *APIAuthService) getFirstUser(ctx context.Context, userKey userKey, para } func (a *APIAuthService) GetUserByID(ctx context.Context, userID string) (*model.User, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") intID, err := userIDToInt(userID) if err != nil { return nil, fmt.Errorf("userID as int64: %w", err) @@ -1317,6 +1321,7 @@ func (a *APIAuthService) GetUserByID(ctx context.Context, userID string) (*model } func (a *APIAuthService) GetUser(ctx context.Context, username string) (*model.User, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") return a.cache.GetUser(userKey{username: username}, func() (*model.User, error) { resp, err := a.apiClient.GetUserWithResponse(ctx, username) if err != nil { @@ -1339,10 +1344,12 @@ func (a *APIAuthService) GetUser(ctx context.Context, username string) (*model.U } func (a *APIAuthService) GetUserByEmail(ctx context.Context, email string) (*model.User, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") return a.getFirstUser(ctx, userKey{email: email}, &ListUsersParams{Email: swag.String(email)}) } func (a *APIAuthService) GetUserByExternalID(ctx context.Context, externalID string) (*model.User, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") return a.getFirstUser(ctx, userKey{externalID: externalID}, &ListUsersParams{ExternalId: swag.String(externalID)}) } @@ -1354,6 +1361,7 @@ func toPagination(paginator Pagination) *model.Paginator { } func (a *APIAuthService) ListUsers(ctx context.Context, params *model.PaginationParams) ([]*model.User, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") paginationPrefix := PaginationPrefix(params.Prefix) paginationAfter := PaginationAfter(params.After) paginationAmount := PaginationAmount(params.Amount) @@ -1386,6 +1394,7 @@ func (a *APIAuthService) ListUsers(ctx context.Context, params *model.Pagination } func (a *APIAuthService) UpdateUserFriendlyName(ctx context.Context, userID string, friendlyName string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.UpdateUserFriendlyNameWithResponse(ctx, userID, UpdateUserFriendlyNameJSONRequestBody{ FriendlyName: friendlyName, }) @@ -1397,6 +1406,7 @@ func (a *APIAuthService) UpdateUserFriendlyName(ctx context.Context, userID stri } func (a *APIAuthService) CreateGroup(ctx context.Context, group *model.Group) (*model.Group, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.CreateGroupWithResponse(ctx, CreateGroupJSONRequestBody{ Id: group.DisplayName, }) @@ -1451,6 +1461,7 @@ func paginationAmount(amount int) *PaginationAmount { } func (a *APIAuthService) DeleteGroup(ctx context.Context, groupID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DeleteGroupWithResponse(ctx, groupID) if err != nil { a.logger.WithError(err).WithField("group", groupID).Error("failed to delete group") @@ -1460,6 +1471,7 @@ func (a *APIAuthService) DeleteGroup(ctx context.Context, groupID string) error } func (a *APIAuthService) GetGroup(ctx context.Context, groupID string) (*model.Group, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.GetGroupWithResponse(ctx, groupID) if err != nil { a.logger.WithError(err).WithField("group", groupID).Error("failed to get group") @@ -1477,6 +1489,7 @@ func (a *APIAuthService) GetGroup(ctx context.Context, groupID string) (*model.G } func (a *APIAuthService) ListGroups(ctx context.Context, params *model.PaginationParams) ([]*model.Group, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListGroupsWithResponse(ctx, &ListGroupsParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1502,6 +1515,7 @@ func (a *APIAuthService) ListGroups(ctx context.Context, params *model.Paginatio } func (a *APIAuthService) AddUserToGroup(ctx context.Context, username, groupID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.AddGroupMembershipWithResponse(ctx, groupID, username) if err != nil { a.logger.WithError(err).WithField("group", groupID).WithField("username", username).Error("failed to add user to group") @@ -1511,6 +1525,7 @@ func (a *APIAuthService) AddUserToGroup(ctx context.Context, username, groupID s } func (a *APIAuthService) RemoveUserFromGroup(ctx context.Context, username, groupID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DeleteGroupMembershipWithResponse(ctx, groupID, username) if err != nil { a.logger.WithError(err).WithField("group", groupID).WithField("username", username).Error("failed to remove user from group") @@ -1520,6 +1535,7 @@ func (a *APIAuthService) RemoveUserFromGroup(ctx context.Context, username, grou } func (a *APIAuthService) ListUserGroups(ctx context.Context, username string, params *model.PaginationParams) ([]*model.Group, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListUserGroupsWithResponse(ctx, username, &ListUserGroupsParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1544,6 +1560,7 @@ func (a *APIAuthService) ListUserGroups(ctx context.Context, username string, pa } func (a *APIAuthService) ListGroupUsers(ctx context.Context, groupID string, params *model.PaginationParams) ([]*model.User, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListGroupMembersWithResponse(ctx, groupID, &ListGroupMembersParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1570,6 +1587,7 @@ func (a *APIAuthService) ListGroupUsers(ctx context.Context, groupID string, par } func (a *APIAuthService) WritePolicy(ctx context.Context, policy *model.Policy, update bool) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") if err := model.ValidateAuthEntityID(policy.DisplayName); err != nil { return err } @@ -1630,6 +1648,7 @@ func serializePolicyToModalPolicy(p Policy) *model.Policy { } func (a *APIAuthService) GetPolicy(ctx context.Context, policyDisplayName string) (*model.Policy, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.GetPolicyWithResponse(ctx, policyDisplayName) if err != nil { a.logger.WithError(err).WithField("policy", policyDisplayName).Error("failed to get policy") @@ -1643,6 +1662,7 @@ func (a *APIAuthService) GetPolicy(ctx context.Context, policyDisplayName string } func (a *APIAuthService) DeletePolicy(ctx context.Context, policyDisplayName string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DeletePolicyWithResponse(ctx, policyDisplayName) if err != nil { a.logger.WithError(err).WithField("policy", policyDisplayName).Error("failed to delete policy") @@ -1652,6 +1672,7 @@ func (a *APIAuthService) DeletePolicy(ctx context.Context, policyDisplayName str } func (a *APIAuthService) ListPolicies(ctx context.Context, params *model.PaginationParams) ([]*model.Policy, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListPoliciesWithResponse(ctx, &ListPoliciesParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1673,6 +1694,7 @@ func (a *APIAuthService) ListPolicies(ctx context.Context, params *model.Paginat } func (a *APIAuthService) CreateCredentials(ctx context.Context, username string) (*model.Credential, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.CreateCredentialsWithResponse(ctx, username, &CreateCredentialsParams{}) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to create credentials") @@ -1693,6 +1715,7 @@ func (a *APIAuthService) CreateCredentials(ctx context.Context, username string) } func (a *APIAuthService) AddCredentials(ctx context.Context, username, accessKeyID, secretAccessKey string) (*model.Credential, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.CreateCredentialsWithResponse(ctx, username, &CreateCredentialsParams{ AccessKey: &accessKeyID, SecretKey: &secretAccessKey, @@ -1716,6 +1739,7 @@ func (a *APIAuthService) AddCredentials(ctx context.Context, username, accessKey } func (a *APIAuthService) DeleteCredentials(ctx context.Context, username, accessKeyID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DeleteCredentialsWithResponse(ctx, username, accessKeyID) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to delete credentials") @@ -1725,6 +1749,7 @@ func (a *APIAuthService) DeleteCredentials(ctx context.Context, username, access } func (a *APIAuthService) GetCredentialsForUser(ctx context.Context, username, accessKeyID string) (*model.Credential, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.GetCredentialsForUserWithResponse(ctx, username, accessKeyID) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to get credentials") @@ -1744,6 +1769,7 @@ func (a *APIAuthService) GetCredentialsForUser(ctx context.Context, username, ac } func (a *APIAuthService) GetCredentials(ctx context.Context, accessKeyID string) (*model.Credential, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") return a.cache.GetCredential(accessKeyID, func() (*model.Credential, error) { resp, err := a.apiClient.GetCredentialsWithResponse(ctx, accessKeyID) if err != nil { @@ -1778,6 +1804,7 @@ func (a *APIAuthService) GetCredentials(ctx context.Context, accessKeyID string) } func (a *APIAuthService) ListUserCredentials(ctx context.Context, username string, params *model.PaginationParams) ([]*model.Credential, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListUserCredentialsWithResponse(ctx, username, &ListUserCredentialsParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1806,6 +1833,7 @@ func (a *APIAuthService) ListUserCredentials(ctx context.Context, username strin } func (a *APIAuthService) AttachPolicyToUser(ctx context.Context, policyDisplayName, username string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.AttachPolicyToUserWithResponse(ctx, username, policyDisplayName) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to attach policy to user") @@ -1815,6 +1843,7 @@ func (a *APIAuthService) AttachPolicyToUser(ctx context.Context, policyDisplayNa } func (a *APIAuthService) DetachPolicyFromUser(ctx context.Context, policyDisplayName, username string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DetachPolicyFromUserWithResponse(ctx, username, policyDisplayName) if err != nil { a.logger.WithError(err).WithField("username", username).Error("failed to detach policy from user") @@ -1846,6 +1875,7 @@ func (a *APIAuthService) listUserPolicies(ctx context.Context, username string, } func (a *APIAuthService) ListUserPolicies(ctx context.Context, username string, params *model.PaginationParams) ([]*model.Policy, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") return a.listUserPolicies(ctx, username, params, false) } @@ -1870,6 +1900,7 @@ func (a *APIAuthService) listAllEffectivePolicies(ctx context.Context, username } func (a *APIAuthService) ListEffectivePolicies(ctx context.Context, username string, params *model.PaginationParams) ([]*model.Policy, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") if params.Amount == -1 { // read through the cache when requesting the full list policies, err := a.cache.GetUserPolicies(username, func() ([]*model.Policy, error) { @@ -1884,6 +1915,7 @@ func (a *APIAuthService) ListEffectivePolicies(ctx context.Context, username str } func (a *APIAuthService) AttachPolicyToGroup(ctx context.Context, policyDisplayName, groupID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.AttachPolicyToGroupWithResponse(ctx, groupID, policyDisplayName) if err != nil { a.logger.WithError(err). @@ -1895,6 +1927,7 @@ func (a *APIAuthService) AttachPolicyToGroup(ctx context.Context, policyDisplayN } func (a *APIAuthService) DetachPolicyFromGroup(ctx context.Context, policyDisplayName, groupID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.DetachPolicyFromGroupWithResponse(ctx, groupID, policyDisplayName) if err != nil { a.logger.WithError(err). @@ -1906,6 +1939,7 @@ func (a *APIAuthService) DetachPolicyFromGroup(ctx context.Context, policyDispla } func (a *APIAuthService) ListGroupPolicies(ctx context.Context, groupID string, params *model.PaginationParams) ([]*model.Policy, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") resp, err := a.apiClient.ListGroupPoliciesWithResponse(ctx, groupID, &ListGroupPoliciesParams{ Prefix: paginationPrefix(params.Prefix), After: paginationAfter(params.After), @@ -1927,6 +1961,7 @@ func (a *APIAuthService) ListGroupPolicies(ctx context.Context, groupID string, } func (a *APIAuthService) Authorize(ctx context.Context, req *AuthorizationRequest) (*AuthorizationResponse, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") policies, _, err := a.ListEffectivePolicies(ctx, req.Username, &model.PaginationParams{ After: "", // all Amount: -1, // all @@ -1949,6 +1984,7 @@ func (a *APIAuthService) Authorize(ctx context.Context, req *AuthorizationReques } func (a *APIAuthService) ClaimTokenIDOnce(ctx context.Context, tokenID string, expiresAt int64) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") res, err := a.apiClient.ClaimTokenIdWithResponse(ctx, ClaimTokenIdJSONRequestBody{ ExpiresAt: expiresAt, TokenId: tokenID, @@ -1964,7 +2000,8 @@ func (a *APIAuthService) ClaimTokenIDOnce(ctx context.Context, tokenID string, e } func (a *APIAuthService) CheckHealth(ctx context.Context, logger logging.Logger, timeout time.Duration) error { - logger.Info("perform health check, this can take up to ", timeout) + ctx = httputil.SetClientTrace(ctx, "api_auth") + logger.Info("Performing health check, this can take up to ", timeout) bo := backoff.NewExponentialBackOff() bo.MaxInterval = healthCheckMaxInterval bo.InitialInterval = healthCheckInitialInterval @@ -2000,6 +2037,7 @@ func (a *APIAuthService) IsExternalPrincipalsEnabled(ctx context.Context) bool { } func (a *APIAuthService) CreateUserExternalPrincipal(ctx context.Context, userID, principalID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") if !a.IsExternalPrincipalsEnabled(ctx) { return fmt.Errorf("external principals disabled: %w", ErrInvalidRequest) } @@ -2015,6 +2053,7 @@ func (a *APIAuthService) CreateUserExternalPrincipal(ctx context.Context, userID } func (a *APIAuthService) DeleteUserExternalPrincipal(ctx context.Context, userID, principalID string) error { + ctx = httputil.SetClientTrace(ctx, "api_auth") if !a.IsExternalPrincipalsEnabled(ctx) { return fmt.Errorf("external principals disabled: %w", ErrInvalidRequest) } @@ -2028,6 +2067,7 @@ func (a *APIAuthService) DeleteUserExternalPrincipal(ctx context.Context, userID } func (a *APIAuthService) GetExternalPrincipal(ctx context.Context, principalID string) (*model.ExternalPrincipal, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") if !a.IsExternalPrincipalsEnabled(ctx) { return nil, fmt.Errorf("external principals disabled: %w", ErrInvalidRequest) } @@ -2047,6 +2087,7 @@ func (a *APIAuthService) GetExternalPrincipal(ctx context.Context, principalID s } func (a *APIAuthService) ListUserExternalPrincipals(ctx context.Context, userID string, params *model.PaginationParams) ([]*model.ExternalPrincipal, *model.Paginator, error) { + ctx = httputil.SetClientTrace(ctx, "api_auth") if !a.IsExternalPrincipalsEnabled(ctx) { return nil, nil, fmt.Errorf("external principals disabled: %w", ErrInvalidRequest) } diff --git a/pkg/auth/service_test.go b/pkg/auth/service_test.go index ad879c6f536..20441144c88 100644 --- a/pkg/auth/service_test.go +++ b/pkg/auth/service_test.go @@ -1022,7 +1022,7 @@ func TestAPIAuthService_DeleteUser(t *testing.T) { }, } ctx := context.Background() - mockClient.EXPECT().DeleteUserWithResponse(ctx, tt.userName).Return(response, nil) + mockClient.EXPECT().DeleteUserWithResponse(gomock.Any(), tt.userName).Return(response, nil) err := s.DeleteUser(ctx, tt.userName) if !errors.Is(err, tt.expectedErr) { t.Fatalf("DeleteUser: expected err: %v got: %v", tt.expectedErr, err) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 8b09b5f01f1..81ee4898ee5 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -150,6 +150,9 @@ type BlockstoreMetadata struct { type Adapter interface { Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error) + + // GetWalker is never called on the server side. + // TODO(itaiad200): Remove it from this interface. GetWalker(uri *url.URL) (Walker, error) // GetPreSignedURL returns a pre-signed URL for accessing obj with mode, and the diff --git a/pkg/block/factory/build.go b/pkg/block/factory/build.go index 906fcfa0d8d..ffa6b03ced5 100644 --- a/pkg/block/factory/build.go +++ b/pkg/block/factory/build.go @@ -26,6 +26,15 @@ const ( ) func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) { + adapter, err := buildBlockAdapter(ctx, statsCollector, c) + if err != nil { + return nil, err + } + + return block.NewMetricsAdapter(adapter), nil +} + +func buildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) { blockstore := c.BlockstoreType() logging.FromContext(ctx). WithField("type", blockstore). diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go new file mode 100644 index 00000000000..2e2907cbe27 --- /dev/null +++ b/pkg/block/metrics.go @@ -0,0 +1,133 @@ +package block + +import ( + "context" + "io" + "net/http" + "net/url" + "time" + + "github.com/treeverse/lakefs/pkg/httputil" +) + +type MetricsAdapter struct { + adapter Adapter +} + +func NewMetricsAdapter(adapter Adapter) Adapter { + return &MetricsAdapter{adapter: adapter} +} + +func (m *MetricsAdapter) InnerAdapter() Adapter { + return m.adapter +} + +func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.Put(ctx, obj, sizeBytes, reader, opts) +} + +func (m *MetricsAdapter) Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.Get(ctx, obj) +} + +func (m *MetricsAdapter) GetWalker(uri *url.URL) (Walker, error) { + return m.adapter.GetWalker(uri) +} + +func (m *MetricsAdapter) GetPreSignedURL(ctx context.Context, obj ObjectPointer, mode PreSignMode) (string, time.Time, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.GetPreSignedURL(ctx, obj, mode) +} + +func (m *MetricsAdapter) GetPresignUploadPartURL(ctx context.Context, obj ObjectPointer, uploadID string, partNumber int) (string, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.GetPresignUploadPartURL(ctx, obj, uploadID, partNumber) +} + +func (m *MetricsAdapter) Exists(ctx context.Context, obj ObjectPointer) (bool, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.Exists(ctx, obj) +} + +func (m *MetricsAdapter) GetRange(ctx context.Context, obj ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.GetRange(ctx, obj, startPosition, endPosition) +} + +func (m *MetricsAdapter) GetProperties(ctx context.Context, obj ObjectPointer) (Properties, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.GetProperties(ctx, obj) +} + +func (m *MetricsAdapter) Remove(ctx context.Context, obj ObjectPointer) error { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.Remove(ctx, obj) +} + +func (m *MetricsAdapter) Copy(ctx context.Context, sourceObj, destinationObj ObjectPointer) error { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.Copy(ctx, sourceObj, destinationObj) +} + +func (m *MetricsAdapter) CreateMultiPartUpload(ctx context.Context, obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (*CreateMultiPartUploadResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.CreateMultiPartUpload(ctx, obj, r, opts) +} + +func (m *MetricsAdapter) UploadPart(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*UploadPartResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.UploadPart(ctx, obj, sizeBytes, reader, uploadID, partNumber) +} + +func (m *MetricsAdapter) ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.ListParts(ctx, obj, uploadID, opts) +} + +func (m *MetricsAdapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.UploadCopyPart(ctx, sourceObj, destinationObj, uploadID, partNumber) +} + +func (m *MetricsAdapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*UploadPartResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.UploadCopyPartRange(ctx, sourceObj, destinationObj, uploadID, partNumber, startPosition, endPosition) +} + +func (m *MetricsAdapter) AbortMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string) error { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.AbortMultiPartUpload(ctx, obj, uploadID) +} + +func (m *MetricsAdapter) CompleteMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string, multipartList *MultipartUploadCompletion) (*CompleteMultiPartUploadResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.CompleteMultiPartUpload(ctx, obj, uploadID, multipartList) +} + +func (m *MetricsAdapter) BlockstoreType() string { + return m.adapter.BlockstoreType() +} + +func (m *MetricsAdapter) BlockstoreMetadata(ctx context.Context) (*BlockstoreMetadata, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.BlockstoreMetadata(ctx) +} + +func (m *MetricsAdapter) GetStorageNamespaceInfo() StorageNamespaceInfo { + return m.adapter.GetStorageNamespaceInfo() +} + +func (m *MetricsAdapter) ResolveNamespace(storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error) { + return m.adapter.ResolveNamespace(storageNamespace, key, identifierType) +} + +func (m *MetricsAdapter) GetRegion(ctx context.Context, storageNamespace string) (string, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.GetRegion(ctx, storageNamespace) +} + +func (m *MetricsAdapter) RuntimeStats() map[string]string { + return m.adapter.RuntimeStats() +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7ef8e9870bd..209c3b224fd 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -12,6 +12,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/go-test/deep" "github.com/spf13/viper" + "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/block/factory" "github.com/treeverse/lakefs/pkg/block/gs" "github.com/treeverse/lakefs/pkg/block/local" @@ -113,8 +114,12 @@ func TestConfig_BuildBlockAdapter(t *testing.T) { testutil.Must(t, err) adapter, err := factory.BuildBlockAdapter(ctx, nil, c) testutil.Must(t, err) - if _, ok := adapter.(*local.Adapter); !ok { - t.Fatalf("expected a local block adapter, got something else instead") + metricsAdapter, ok := adapter.(*block.MetricsAdapter) + if !ok { + t.Fatalf("got a %T when expecting a MetricsAdapter", adapter) + } + if _, ok := metricsAdapter.InnerAdapter().(*local.Adapter); !ok { + t.Fatalf("got %T expected a local block adapter", metricsAdapter.InnerAdapter()) } }) @@ -134,7 +139,12 @@ func TestConfig_BuildBlockAdapter(t *testing.T) { testutil.Must(t, err) adapter, err := factory.BuildBlockAdapter(ctx, nil, c) testutil.Must(t, err) - if _, ok := adapter.(*gs.Adapter); !ok { + + metricsAdapter, ok := adapter.(*block.MetricsAdapter) + if !ok { + t.Fatalf("expected a metrics block adapter, got something else instead") + } + if _, ok := metricsAdapter.InnerAdapter().(*gs.Adapter); !ok { t.Fatalf("expected an gs block adapter, got something else instead") } }) diff --git a/pkg/httputil/client_trace.go b/pkg/httputil/client_trace.go new file mode 100644 index 00000000000..ba305ae1a1f --- /dev/null +++ b/pkg/httputil/client_trace.go @@ -0,0 +1,29 @@ +package httputil + +import ( + "context" + "net/http/httptrace" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Known limitation: for HTTP2 services, like cosmosDB, +// the gauge is never reduced. Hence, we'll treat it as a counter for new connections created. +var connectionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "out_in_use_conns", + Help: "A gauge of in-use TCP connections", +}, []string{"service"}) + +func SetClientTrace(ctx context.Context, service string) context.Context { + trace := &httptrace.ClientTrace{ + GotConn: func(info httptrace.GotConnInfo) { + connectionGauge.WithLabelValues(service).Inc() + }, + PutIdleConn: func(err error) { + connectionGauge.WithLabelValues(service).Dec() + }, + } + + return httptrace.WithClientTrace(ctx, trace) +} diff --git a/pkg/kv/metrics.go b/pkg/kv/metrics.go index ef70277151c..c620cc399d2 100644 --- a/pkg/kv/metrics.go +++ b/pkg/kv/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/treeverse/lakefs/pkg/httputil" ) var ( @@ -31,6 +32,7 @@ type StoreMetricsWrapper struct { func (s *StoreMetricsWrapper) Get(ctx context.Context, partitionKey, key []byte) (*ValueWithPredicate, error) { const operation = "Get" timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation)) + ctx = httputil.SetClientTrace(ctx, s.StoreType) defer timer.ObserveDuration() res, err := s.Store.Get(ctx, partitionKey, key) if err != nil { @@ -42,6 +44,7 @@ func (s *StoreMetricsWrapper) Get(ctx context.Context, partitionKey, key []byte) func (s *StoreMetricsWrapper) Set(ctx context.Context, partitionKey, key, value []byte) error { const operation = "Set" timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation)) + ctx = httputil.SetClientTrace(ctx, s.StoreType) defer timer.ObserveDuration() err := s.Store.Set(ctx, partitionKey, key, value) if err != nil { @@ -53,6 +56,7 @@ func (s *StoreMetricsWrapper) Set(ctx context.Context, partitionKey, key, value func (s *StoreMetricsWrapper) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate Predicate) error { const operation = "SetIf" timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation)) + ctx = httputil.SetClientTrace(ctx, s.StoreType) defer timer.ObserveDuration() err := s.Store.SetIf(ctx, partitionKey, key, value, valuePredicate) if err != nil { @@ -64,6 +68,7 @@ func (s *StoreMetricsWrapper) SetIf(ctx context.Context, partitionKey, key, valu func (s *StoreMetricsWrapper) Delete(ctx context.Context, partitionKey, key []byte) error { const operation = "Delete" timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation)) + ctx = httputil.SetClientTrace(ctx, s.StoreType) defer timer.ObserveDuration() err := s.Store.Delete(ctx, partitionKey, key) if err != nil { @@ -75,6 +80,7 @@ func (s *StoreMetricsWrapper) Delete(ctx context.Context, partitionKey, key []by func (s *StoreMetricsWrapper) Scan(ctx context.Context, partitionKey []byte, options ScanOptions) (EntriesIterator, error) { const operation = "Scan" timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation)) + ctx = httputil.SetClientTrace(ctx, s.StoreType) defer timer.ObserveDuration() res, err := s.Store.Scan(ctx, partitionKey, options) if err != nil {