Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid making delete requests when deletes are disabled for a user #6583

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -242,7 +244,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsClient("querier")
deleteStore, err := t.deleteRequestsClient("querier", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -775,7 +777,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsClient("ruler")
deleteStore, err := t.deleteRequestsClient("ruler", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -995,7 +997,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsClient, error) {
func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -1015,7 +1017,12 @@ func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsC
return nil, err
}

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package deletion

import (
"context"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)

type perTenantDeleteRequestsClient struct {
client DeleteRequestsClient
limits retention.Limits
}

func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient {
return &perTenantDeleteRequestsClient{
client: c,
limits: l,
}
}

func (c *perTenantDeleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
allLimits := c.limits.AllByUserID()
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
userLimits, ok := allLimits[userID]
if ok && userLimits.CompactorDeletionEnabled || c.limits.DefaultLimits().CompactorDeletionEnabled {
Copy link
Contributor

@vlad-diachenko vlad-diachenko Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MasslessParticle it looks like if we have CompactorDeletionEnabled=true in defaultLimits then even if we disable deletion for this particular tenant, it will be ignored and deletion will be processed.

return c.client.GetAllDeleteRequestsForUser(ctx, userID)
}

return nil, nil
}

func (c *perTenantDeleteRequestsClient) Stop() {
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
c.client.Stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package deletion

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestTenantDeleteRequestsClient(t *testing.T) {
fakeClient := &fakeRequestsClient{
reqs: []DeleteRequest{{
RequestID: "test-request",
}},
}
perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, limits)

t.Run("tenant enabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "1")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("tenant disabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2")
require.Nil(t, err)
require.Empty(t, reqs)
})

t.Run("default is enabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = true
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("default is disabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = false
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Empty(t, reqs)
})
}

type fakeRequestsClient struct {
DeleteRequestsClient

reqs []DeleteRequest
}

func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]DeleteRequest, error) {
return c.reqs, nil
}

var (
limits = &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: true},
"2": {compactorDeletionEnabled: false},
},
}
)