Skip to content

Commit

Permalink
Add query service with OTLP
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <p.loffay@gmail.com>
  • Loading branch information
pavolloffay committed Jun 17, 2021
1 parent e15c981 commit d59f0b3
Show file tree
Hide file tree
Showing 25 changed files with 5,111 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ cmd/docs/*.1
cmd/docs/*.yaml
crossdock/crossdock-*
run-crossdock.log
proto-gen/.patched-otel-proto/
__pycache__

3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "jaeger-ui"]
path = jaeger-ui
url = https://github.com/jaegertracing/jaeger-ui.git
[submodule "opentelemetry-proto"]
path = opentelemetry-proto
url = https://github.com/open-telemetry/opentelemetry-proto.git
56 changes: 54 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,17 @@ generate-mocks: install-mockery
echo-version:
@echo $(GIT_CLOSEST_TAG)

PROTO_INTERMEDIATE_DIR = proto-gen/.patched-otel-proto
UNAME_PLATFORM := $(shell uname -s)
ifeq ($(UNAME_PLATFORM), Darwin)
SED_OPTS := ''
endif
PROTOC := docker run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${JAEGER_DOCKER_PROTOBUF} --proto_path=${PWD}
PROTO_INCLUDES := \
-Iidl/proto/api_v2 \
-Iidl/proto/api_v3 \
-Imodel/proto/metrics \
-I$(PROTO_INTERMEDIATE_DIR) \
-I/usr/include/github.com/gogo/protobuf
# Remapping of std types to gogo types (must not contain spaces)
PROTO_GOGO_MAPPINGS := $(shell echo \
Expand All @@ -473,9 +480,8 @@ PROTO_GOGO_MAPPINGS := $(shell echo \
Mmodel.proto=github.com/jaegertracing/jaeger/model \
| sed 's/ //g')


.PHONY: proto
proto:
proto: proto-prepare-otel
# Generate gogo, swagger, go-validators, gRPC-storage-plugin output.
#
# -I declares import folders, in order of importance
Expand Down Expand Up @@ -552,6 +558,52 @@ proto:
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/zipkin \
idl/proto/zipkin.proto

$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/common/v1/common.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/resource/v1/resource.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/trace/v1/trace.proto

# Revert changes in OTEL proto and modify only package
# The goal here is to import opentelemetry.proto.trace.v1.ResourceSpans in the query service
rm -rf $(PROTO_INTERMEDIATE_DIR)/*
cp -R opentelemetry-proto/* $(PROTO_INTERMEDIATE_DIR)
find $(PROTO_INTERMEDIATE_DIR) -name "*.proto" | xargs -L 1 sed -i $(SED_OPTS) 's+github.com/open-telemetry/opentelemetry-proto/gen/go+github.com/jaegertracing/jaeger/proto-gen/otel+g'
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--grpc-gateway_out=logtostderr=true,grpc_api_configuration=idl/proto/api_v3/query_service_http.yaml,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--swagger_out=disable_default_errors=true,logtostderr=true,grpc_api_configuration=idl/proto/api_v3/query_service_http.yaml:$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
rm -rf $(PROTO_INTERMEDIATE_DIR)

.PHONY: poroto-prepare-otel
proto-prepare-otel:
@echo --
@echo -- Copying to $(PROTO_INTERMEDIATE_DIR)
@echo --
mkdir -p $(PROTO_INTERMEDIATE_DIR)
cp -R opentelemetry-proto/opentelemetry/proto/* $(PROTO_INTERMEDIATE_DIR)

@echo --
@echo -- Editing proto
@echo --
@# Update go_package
find $(PROTO_INTERMEDIATE_DIR) -name "*.proto" | xargs -L 1 sed -i $(SED_OPTS) -f proto_patch.sed

.PHONY: proto-hotrod
proto-hotrod:
$(PROTOC) \
Expand Down
13 changes: 13 additions & 0 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ type GRPCHandler struct {
nowFn func() time.Time
}

var _ api_v2.QueryServiceServer = (*GRPCHandler)(nil)

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
tracer: tracer,
}

return gH
}

// GetTrace is the gRPC handler to fetch traces based on trace-id.
func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QueryService_GetTraceServer) error {
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
Expand Down
44 changes: 44 additions & 0 deletions cmd/query/app/otel/grpc_gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otel

import (
"context"

"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/proto-gen/api_v3"
)

func RegisterGRPCGateway(r *mux.Router, grpcEndpoint string) error {
jsonpb := &JSONPb{
EmitDefaults: true,
Indent: " ",
OrigName: true,
}
grpcGatewayMux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb),
)
opts := []grpc.DialOption{grpc.WithInsecure()}
err := api_v3.RegisterQueryServiceHandlerFromEndpoint(context.Background(), grpcGatewayMux, grpcEndpoint, opts)
if err != nil {
return err
}
// TODO (pavolloffay) matching does not work when query API base path is configured
r.PathPrefix("/v3/").Handler(grpcGatewayMux)
return nil
}
97 changes: 97 additions & 0 deletions cmd/query/app/otel/grpc_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otel

import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" //force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestGRPCGateway(t *testing.T) {
r := &spanstoremocks.Reader{}
traceID := model.NewTraceID(150, 160)
r.On("GetTrace", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("model.TraceID")).Return(
&model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
},
},
}, nil).Once()

q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{})
server := grpc.NewServer()
h := &Handler{
QueryService: q,
}
api_v3.RegisterQueryServiceServer(server, h)

lis, _ := net.Listen("tcp", ":0")
go func() {
err := server.Serve(lis)
require.NoError(t, err)
}()

router := &mux.Router{}
err := RegisterGRPCGateway(router, lis.Addr().String())
require.NoError(t, err)

httpLis, _ := net.Listen("tcp", ":0")
go func() {
err = http.Serve(httpLis, router)
require.NoError(t, err)
}()
req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1)), nil)
req.Header.Set("Content-Type", "application/json")
response, err := http.DefaultClient.Do(req)
buf := bytes.Buffer{}
_, err = buf.ReadFrom(response.Body)
require.NoError(t, err)

jsonpb := &JSONPb{}
var w resultWrapper
err = json.Unmarshal(buf.Bytes(), &w)
require.NoError(t, err)
var spansResponse api_v3.SpansResponseChunk
err = jsonpb.Unmarshal(w.Result, spansResponse)
require.NoError(t, err)
assert.Equal(t, 1, len(spansResponse.GetResourceSpans()))
assert.Equal(t, uint64ToTraceID(traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetInstrumentationLibrarySpans()[0].GetSpans()[0].TraceId.Bytes())
}

// see https://github.com/grpc-ecosystem/grpc-gateway/issues/2189
type resultWrapper struct {
Result json.RawMessage `json:"result"`
}
128 changes: 128 additions & 0 deletions cmd/query/app/otel/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otel

import (
"context"

"github.com/gogo/protobuf/types"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type Handler struct {
QueryService *querysvc.QueryService
}

var _ api_v3.QueryServiceServer = (*Handler)(nil)

func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryService_GetTraceServer) error {
traceID, err := model.TraceIDFromString(request.GetTraceId())
if err != nil {
return err
}

trace, err := h.QueryService.GetTrace(context.Background(), traceID)
if err != nil {
return err
}
resourceSpans := jaegerSpansToOTLP(trace.GetSpans())
return stream.Send(&api_v3.SpansResponseChunk{
ResourceSpans: resourceSpans,
})
}

func (h *Handler) GetTraces(request *api_v3.FindTracesRequest, stream api_v3.QueryService_GetTracesServer) error {
query := request.GetQuery()
queryParams := &spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetNumTraces()),
}
if query.GetStartTimeMin() != nil {
startTimeMin, err := types.TimestampFromProto(query.GetStartTimeMin())
if err != nil {
return err
}
queryParams.StartTimeMin = startTimeMin
}
if query.GetStartTimeMax() != nil {
startTimeMax, err := types.TimestampFromProto(query.GetStartTimeMax())
if err != nil {
return err
}
queryParams.StartTimeMax = startTimeMax
}
if query.GetDurationMin() != nil {
durationMin, err := types.DurationFromProto(query.GetDurationMin())
if err != nil {
return err
}
queryParams.DurationMin = durationMin
}
if query.GetStartTimeMax() != nil {
durationMax, err := types.DurationFromProto(query.GetDurationMax())
if err != nil {
return err
}
queryParams.DurationMax = durationMax
}

traces, err := h.QueryService.FindTraces(context.Background(), queryParams)
if err != nil {
return err
}
for _, t := range traces {
resourceSpans := jaegerSpansToOTLP(t.GetSpans())
stream.Send(&api_v3.SpansResponseChunk{
ResourceSpans: resourceSpans,
})
}
return nil
}

func (h *Handler) GetServices(ctx context.Context, _ *api_v3.GetServicesRequest) (*api_v3.GetServicesResponse, error) {
services, err := h.QueryService.GetServices(ctx)
if err != nil {
return nil, err
}
return &api_v3.GetServicesResponse{
Services: services,
}, nil
}

func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperationsRequest) (*api_v3.GetOperationsResponse, error) {
operations, err := h.QueryService.GetOperations(ctx, spanstore.OperationQueryParameters{
ServiceName: request.GetService(),
SpanKind: request.GetSpanKind(),
})
if err != nil {
return nil, err
}
apiOperations := make([]*api_v3.Operation, len(operations))
for i := range operations {
apiOperations[i] = &api_v3.Operation{
Name: operations[i].Name,
SpanKind: operations[i].SpanKind,
}
}
return &api_v3.GetOperationsResponse{
Operations: apiOperations,
}, nil
}
Loading

0 comments on commit d59f0b3

Please sign in to comment.