Skip to content

Commit

Permalink
update some of the forked code to use dskit grpcclient and grpcutil s…
Browse files Browse the repository at this point in the history
…ince #4312 was merged after the original fork was created.
  • Loading branch information
slim-bean committed Oct 14, 2021
1 parent 8c14e31 commit d6d3387
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 12 deletions.
5 changes: 3 additions & 2 deletions pkg/lokifrontend/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -22,6 +21,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"

lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)

var (
Expand Down Expand Up @@ -153,7 +154,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := (*grpcutil.HttpgrpcHeadersCarrier)(req)
carrier := (*lokigrpc.HttpgrpcHeadersCarrier)(req)
err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier)
if err != nil {
return nil, err
Expand Down
7 changes: 4 additions & 3 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"

lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)

// Config for a Frontend.
Expand Down Expand Up @@ -162,7 +163,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := (*grpcutil.HttpgrpcHeadersCarrier)(req)
carrier := (*lokigrpc.HttpgrpcHeadersCarrier)(req)
if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
util_log "github.com/cortexproject/cortex/pkg/util/log"
cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
Expand All @@ -28,6 +27,8 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)

func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
Expand Down Expand Up @@ -137,7 +138,7 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer

tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := grpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
parentSpanContext, _ := lokigrpc.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
Expand All @@ -30,6 +29,8 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)

var (
Expand Down Expand Up @@ -340,7 +341,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
// Extract tracing information from headers in HTTP request. FrontendContext doesn't have the correct tracing
// information, since that is a long-running request.
tracer := opentracing.GlobalTracer()
parentSpanContext, err := grpcutil.GetParentSpanForRequest(tracer, msg.HttpRequest)
parentSpanContext, err := lokigrpc.GetParentSpanForRequest(tracer, msg.HttpRequest)
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/util/httpgrpc/carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package httpgrpc

import (
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/httpgrpc"
)

// Used to transfer trace information from/to HTTP request.
type HttpgrpcHeadersCarrier httpgrpc.HTTPRequest

func (c *HttpgrpcHeadersCarrier) Set(key, val string) {
c.Headers = append(c.Headers, &httpgrpc.Header{
Key: key,
Values: []string{val},
})
}

func (c *HttpgrpcHeadersCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.Headers {
for _, v := range h.Values {
if err := handler(h.Key, v); err != nil {
return err
}
}
}
return nil
}

func GetParentSpanForRequest(tracer opentracing.Tracer, req *httpgrpc.HTTPRequest) (opentracing.SpanContext, error) {
if tracer == nil {
return nil, nil
}

carrier := (*HttpgrpcHeadersCarrier)(req)
extracted, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
if err == opentracing.ErrSpanContextNotFound {
err = nil
}
return extracted, err
}

0 comments on commit d6d3387

Please sign in to comment.