Skip to content

Commit

Permalink
avoid making delete requests if deletes are disabled for a user (#6583)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored Jul 6, 2022
1 parent 1c8ebaf commit a92e048
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 4 deletions.
15 changes: 11 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -242,7 +244,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsClient("querier")
deleteStore, err := t.deleteRequestsClient("querier", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -775,7 +777,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsClient("ruler")
deleteStore, err := t.deleteRequestsClient("ruler", t.overrides)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -995,7 +997,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsClient, error) {
func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -1015,7 +1017,12 @@ func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsC
return nil, err
}

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType)
if err != nil {
return nil, err
}

return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package deletion

import (
"context"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)

type perTenantDeleteRequestsClient struct {
client DeleteRequestsClient
limits retention.Limits
}

func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient {
return &perTenantDeleteRequestsClient{
client: c,
limits: l,
}
}

func (c *perTenantDeleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
allLimits := c.limits.AllByUserID()
userLimits, ok := allLimits[userID]
if ok && userLimits.CompactorDeletionEnabled || c.limits.DefaultLimits().CompactorDeletionEnabled {
return c.client.GetAllDeleteRequestsForUser(ctx, userID)
}

return nil, nil
}

func (c *perTenantDeleteRequestsClient) Stop() {
c.client.Stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package deletion

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestTenantDeleteRequestsClient(t *testing.T) {
fakeClient := &fakeRequestsClient{
reqs: []DeleteRequest{{
RequestID: "test-request",
}},
}
perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, limits)

t.Run("tenant enabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "1")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("tenant disabled", func(t *testing.T) {
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2")
require.Nil(t, err)
require.Empty(t, reqs)
})

t.Run("default is enabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = true
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})

t.Run("default is disabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = false
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Empty(t, reqs)
})
}

type fakeRequestsClient struct {
DeleteRequestsClient

reqs []DeleteRequest
}

func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]DeleteRequest, error) {
return c.reqs, nil
}

var (
limits = &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: true},
"2": {compactorDeletionEnabled: false},
},
}
)

0 comments on commit a92e048

Please sign in to comment.