Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use grpc for communicating with compactor for query time filtering of data requested for deletion #7804

Merged
merged 5 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [7731](https://github.com/grafana/loki/pull/7731) **bitkill**: Add healthchecks to the docker-compose example.
* [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables.
* [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules.
* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion.

##### Fixes

Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places.
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]

# Address and port number where the compactor grpc requests are being served.
# CLI flag: -common.compactor-grpc-address
[compactor_grpc_address: <string> | default = ""]

## analytics

The `analytics` block configures the reporting of Loki analytics to grafana.com.
Expand Down
4 changes: 4 additions & 0 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Config struct {

// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`

// CompactorAddress is the grpc address of the compactor in the form host:port
CompactorGRPCAddress string `yaml:"compactor_grpc_address"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port")
}

type Storage struct {
Expand Down
53 changes: 28 additions & 25 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
Expand All @@ -70,30 +71,31 @@ type Config struct {
UseBufferedLogger bool `yaml:"use_buffered_logger"`
UseSyncLogger bool `yaml:"use_sync_logger"`

Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"`
CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
}

// RegisterFlags registers flag.
Expand All @@ -116,7 +118,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.CompactorClient.RegisterFlags(f)
c.CompactorHTTPClient.RegisterFlags(f)
c.CompactorGRPCClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.StorageConfig.RegisterFlags(f)
Expand Down
71 changes: 45 additions & 26 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
"time"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -43,6 +41,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/ruler"
base_ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/runtime"
Expand All @@ -54,6 +53,8 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/series/index"
Expand Down Expand Up @@ -677,45 +678,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
var client generationnumber.CacheGenClient
if t.supportIndexDeleteRequest() {
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
compactorAddress, isGRPCAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
if err != nil {
return nil, err
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer)
if isGRPCAddress {
client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
}
}

t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer)
return services.NewIdleService(nil, nil), nil
return services.NewIdleService(nil, func(failureCase error) error {
t.cacheGenerationLoader.Stop()
return nil
}), nil
}

func (t *Loki) supportIndexDeleteRequest() bool {
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
}

func (t *Loki) compactorAddress() (string, error) {
// compactorAddress returns the configured address of the compactor.
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
func (t *Loki) compactorAddress() (string, bool, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
proto := "http"
if len(t.Cfg.Server.HTTPTLSConfig.TLSCertPath) > 0 && len(t.Cfg.Server.HTTPTLSConfig.TLSKeyPath) > 0 {
proto = "https"
}
return fmt.Sprintf("%s://%s:%d", proto, t.Cfg.Server.HTTPListenAddress, t.Cfg.Server.HTTPListenPort), nil
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
}

if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured")
}

return t.Cfg.Common.CompactorAddress, nil
if t.Cfg.Common.CompactorGRPCAddress != "" {
return t.Cfg.Common.CompactorGRPCAddress, true, nil
}

return t.Cfg.Common.CompactorAddress, false, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -1012,6 +1021,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))
grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler)
}

return t.compactor, nil
Expand Down Expand Up @@ -1128,17 +1138,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress, err := t.compactorAddress()
compactorAddress, isGRPCAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer)
var compactorClient deletion.CompactorClient
if isGRPCAddress {
compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
}

client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri

type CacheGenNumberLoader interface {
GetResultsCacheGenNumber(tenantIDs []string) string
Stop()
}

// ResultsCacheConfig is the config for the results cache.
Expand Down
100 changes: 100 additions & 0 deletions pkg/storage/stores/indexshipper/compactor/client/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package client

import (
"context"
"flag"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"

"github.com/grafana/dskit/grpcclient"
deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
"google.golang.org/grpc"
)

type GRPCConfig struct {
GRPCClientConfig grpcclient.Config `yaml:",inline"`
}

// RegisterFlags registers flags.
func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f)
}

type compactorGRPCClient struct {
cfg GRPCConfig

GRPCClientRequestDuration *prometheus.HistogramVec
conn *grpc.ClientConn
grpcClient deletion_grpc.CompactorClient
}

// NewGRPCClient supports only methods which are used for internal communication of Loki like
// loading delete requests and cache gen numbers for query time filtering.
func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) {
client := &compactorGRPCClient{
cfg: cfg,
GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki_compactor",
Name: "grpc_request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using compactor GRPC client",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(client.GRPCClientRequestDuration))
if err != nil {
return nil, err
}

client.conn, err = grpc.Dial(addr, dialOpts...)
if err != nil {
return nil, err
}

client.grpcClient = deletion_grpc.NewCompactorClient(client.conn)
return client, nil
}

func (s *compactorGRPCClient) Stop() {
s.conn.Close()
}

func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) {
ctx = user.InjectOrgID(ctx, userID)
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{})
if err != nil {
return nil, err
}

deleteRequests := make([]deletion.DeleteRequest, len(grpcResp.DeleteRequests))
for i, dr := range grpcResp.DeleteRequests {
deleteRequests[i] = deletion.DeleteRequest{
RequestID: dr.RequestID,
StartTime: model.Time(dr.StartTime),
EndTime: model.Time(dr.EndTime),
Query: dr.Query,
Status: deletion.DeleteRequestStatus(dr.Status),
CreatedAt: model.Time(dr.CreatedAt),
}
}

return deleteRequests, nil
}

func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) {
ctx = user.InjectOrgID(ctx, userID)
grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{})
if err != nil {
return "", err
}

return grpcResp.ResultsCacheGen, nil
}

func (s *compactorGRPCClient) Name() string {
return "grpc_client"
}
Loading