Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid making delete requests if deletes are disabled for a user (#6583) #6687

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -241,7 +243,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsClient("querier")
deleteStore, err := t.deleteRequestsClient("querier", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -762,7 +764,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsClient("ruler")
deleteStore, err := t.deleteRequestsClient("ruler", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -969,7 +971,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsClient, error) {
func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -989,7 +991,12 @@ func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsC
return nil, err
}

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
if err != nil {
return nil, err
}

return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package deletion

import (
"context"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)

type perTenantDeleteRequestsClient struct {
client DeleteRequestsClient
limits retention.Limits
}

func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient {
return &perTenantDeleteRequestsClient{
client: c,
limits: l,
}
}

func (c *perTenantDeleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
allLimits := c.limits.AllByUserID()
userLimits, ok := allLimits[userID]
if ok && userLimits.CompactorDeletionEnabled || c.limits.DefaultLimits().CompactorDeletionEnabled {
return c.client.GetAllDeleteRequestsForUser(ctx, userID)
}

return nil, nil
}

func (c *perTenantDeleteRequestsClient) Stop() {
c.client.Stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package deletion

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestTenantDeleteRequestsClient(t *testing.T) {
fakeClient := &fakeRequestsClient{
reqs: []DeleteRequest{{
RequestID: "test-request",
}},
}
perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, limits)

t.Run("tenant enabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "1")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("tenant disabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2")
require.Nil(t, err)
require.Empty(t, reqs)
})

t.Run("default is enabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = true
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("default is disabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = false
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Empty(t, reqs)
})
}

type fakeRequestsClient struct {
DeleteRequestsClient

reqs []DeleteRequest
}

func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]DeleteRequest, error) {
return c.reqs, nil
}

var (
limits = &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: true},
"2": {compactorDeletionEnabled: false},
},
}
)