Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Define an internal interface of storage v2 spanstore #5399

Merged
merged 18 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions storage_v2/proto/storage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) 2024 The Jaeger Authors.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

syntax="proto3";

package jaeger.storage_v2;

import "opentelemetry/proto/collector/trace/v1/trace_service.proto";
import "opentelemetry/proto/trace/v1/trace.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

import "model.proto";

option go_package = "storage_v2";
option java_package = "io.jaegertracing.storage_v2";

// Request object to get a trace.
message GetTraceRequest {
// Hex encoded 64 or 128 bit trace ID.
string trace_id = 1;
// Optional. The start time to search trace ID.
google.protobuf.Timestamp start_time = 2 [
(gogoproto.stdtime) = true
];
// Optional. The end time to search trace ID.
google.protobuf.Timestamp end_time = 3 [
(gogoproto.stdtime) = true
];
}

// Query parameters to find traces. Except for num_traces, all fields should be treated
// as forming a conjunction, e.g., "service_name='X' AND operation_name='Y' AND ...".
// All fields are matched against individual spans, not at the trace level.
// The returned results contain traces where at least one span matches the conditions.
// When num_traces results in fewer traces returned, there is no required ordering.
//
// Note: num_traces should restrict the number of traces returned, but not all backends
// interpret it this way. For instance, in Cassandra this limits the number of _spans_
// that match the conditions, and the resulting number of traces can be less.
//
// Note: some storage implementations do not guarantee the correct implementation of all parameters.
//
message TraceQueryParameters {
string service_name = 1;
string operation_name = 2;
// Attributes are matched against Span and Resource attributes.
// At least one span in a trace must match all specified attributes.
map<string, string> attributes = 3;
// Span min start time in. REST API uses RFC-3339ns format. Required.
google.protobuf.Timestamp start_time_min = 4;
// Span max start time. REST API uses RFC-3339ns format. Required.
google.protobuf.Timestamp start_time_max = 5;
// Span min duration. REST API uses Golang's time format e.g. 10s.
google.protobuf.Duration duration_min = 6;
// Span max duration. REST API uses Golang's time format e.g. 10s.
google.protobuf.Duration duration_max = 7;

// Maximum number of traces in the response.
int32 num_traces = 8;
}

// Request object to search traces.
message FindTracesRequest {
TraceQueryParameters query = 1;
}

// Response object to search trace IDs.
message FindTraceIDsResponse {
// Hex encoded 64 or 128 bit trace ID.
repeated string trace_ids = 1;
}

// Request object to get service names.
message GetServicesRequest {}

// Response object to get service names.
message GetServicesResponse {
repeated string services = 1;
}

// Request object to get operation names.
message GetOperationsRequest {
// Required service name.
string service = 1;
// Optional span kind.
string span_kind = 2;
}

// Operation encapsulates information about operation.
message Operation {
string name = 1;
string span_kind = 2;
}

// Response object to get operation names.
message GetOperationsResponse {
repeated Operation operations = 1;
}

// Request object to archive a trace.
message ArchiveTraceRequest {
// Hex encoded 64 or 128 bit trace ID.
string trace_id = 1;
}

// Response object to archive a trace.
message ArchiveTraceResponse {
}

// Request object to get dependencies.
message GetDependenciesRequest {
google.protobuf.Timestamp start_time = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end_time = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
}

// Response object to get dependencies.
message GetDependenciesResponse {
repeated jaeger.api_v2.DependencyLink dependencies = 1 [
(gogoproto.nullable) = false
];
}

// Request object get to capabilities.
message CapabilitiesRequest {
}

// Response object get to capabilities.
message CapabilitiesResponse {
bool archiveSpanWriter = 1;
}

// spanstore/Writer
service TraceWriter {
// Export is a batched version of WriteSpan that handles OTLP exporter.
rpc Export(opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest) returns (opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse) {}
}

// spanstore/Reader
service TraceReader {
// GetTrace returns a single trace.
// Note that the JSON response over HTTP is wrapped into result envelope "{"result": ...}"
// It means that the JSON response cannot be directly unmarshalled using JSONPb.
// This can be fixed by first parsing into user-defined envelope with standard JSON library
// or string manipulation to remove the envelope. Alternatively generate objects using OpenAPI.
rpc GetTrace(GetTraceRequest) returns (stream opentelemetry.proto.trace.v1.TracesData) {}

// FindTraces searches for traces.
// See GetTrace for JSON unmarshalling.
rpc FindTraces(FindTracesRequest) returns (stream opentelemetry.proto.trace.v1.TracesData) {}

// FindTraceIDs searches for traces and returns the traceID.
rpc FindTraceIDs(FindTracesRequest) returns (FindTraceIDsResponse) {}

// GetServices returns service names.
rpc GetServices(GetServicesRequest) returns (GetServicesResponse) {}

// GetOperations returns operation names.
rpc GetOperations(GetOperationsRequest) returns (GetOperationsResponse) {}
}

// querysvc/QueryService
service TraceArchiver {
// ArchiveTrace finds a trace from primary storage and write it to archive storage.
rpc ArchiveTrace(ArchiveTraceRequest) returns (ArchiveTraceResponse) {}
}

// dependencystore/Reader
service DependenciesReader {
// GetDependencies
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
}

service PluginCapabilities {
// Capabilities returns the supported features of the server.
// Currently, it only has an archiveSpanWriter capability.
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
}

// Below are some helper types when using APIv3 via HTTP endpoints.

// GRPCGatewayError is the type returned when GRPC server returns an error.
// Example: {"error":{"grpcCode":2,"httpCode":500,"message":"...","httpStatus":"text..."}}.
message GRPCGatewayError {
message GRPCGatewayErrorDetails {
int32 grpcCode = 1;
int32 httpCode = 2;
string message = 3;
string httpStatus = 4;
}

GRPCGatewayErrorDetails error = 1;
}

// GRPCGatewayWrapper wraps streaming responses from GetTrace/FindTraces for HTTP.
// Today there is always only one response because internally the HTTP server gets
// data from QueryService that does not support multiple responses. But in the
// future the server may return multiple responeses using Transfer-Encoding: chunked.
// In case of errors, GRPCGatewayError above is used.
//
// Example:
// {"result": {"resourceSpans": ...}}
//
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/2189
//
message GRPCGatewayWrapper {
opentelemetry.proto.trace.v1.TracesData result = 1;
}
14 changes: 14 additions & 0 deletions storage_v2/spanstore/empty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
18 changes: 18 additions & 0 deletions storage_v2/spanstore/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

// Factory defines an interface for a factory that can create implementations of different storage components.
// Implementations are also encouraged to implement plugin.Configurable interface.
type Factory interface {
// Initialize performs internal initialization of the factory, such as opening connections to the backend store.
// It is called after all configuration of the factory itself has been done.
Initialize() error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend comparing this with OTEL collector component lifecycle methods. Eg would you need to accept telemetry objects here, or would it be passed to the constructor function? Do we need Close if we're adding lifecycle methods?

Also, different storage factories (eg dependencies storage) would need the same lifestyle interface so it should be pulled to the higher pkg.

Copy link
Contributor Author

@james-ryans james-ryans May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will pass the telemetry objects to the constructor and only receive the logger and config, which is similar to the existing factory, e.g. grpc.NewFactory(*zap.Logger, grpc.Config). I've looked into other OTEL collector extension implementations and they (mostly) only accept logger from the telemetry object.

I've added the flowchart of OTEL collector component's lifecycle below. Inside the Component.Start(context.Context, component.Host), we will call spanstore.NewFactory(*zap.Logger, component.Config) and then Factory.Initialize(context.Context). Once the component is shutting down through Component.Shutdown(context.Context), we will call Factory.Close(context.Context).

flowchart TD
    extension.Factory --> Factory.CreateExtension
    Factory.CreateExtension -->|returns| component.Component
    component.Component --> Component.Start
    Component.Start --> spanstore.NewFactory
    spanstore.NewFactory -->|returns| spanstore.Factory
    spanstore.Factory --> Factory.Initialize
    Factory.Initialize --> Running
    Running --> Factory.Close
    Factory.Close -->|called by| Component.Shutdown

    subgraph "OTEL Collector"
        extension.Factory
        Factory.CreateExtension
        component.Component
        Component.Start
        Component.Shutdown

        subgraph "Jaeger Storage Extension"
            spanstore.NewFactory
            spanstore.Factory
            Factory.Initialize
            Running
            Factory.Close
        end
    end
Loading


// CreateSpanReader creates a spanstore.Reader.
CreateTraceReader() (Reader, error)

// CreateSpanWriter creates a spanstore.Writer.
CreateTraceWriter() (Writer, error)
}
65 changes: 65 additions & 0 deletions storage_v2/spanstore/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
)

// Reader finds and loads traces and other data from storage.
type Reader interface {
// GetTrace retrieves the trace with a given id.
//
// If no spans are stored for this trace, it returns ErrTraceNotFound.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return value should be in ptrace model, similar to Writer

GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error)

Isn't there a TraceID type in ptrace? I would use that - we want to do a clean break from legacy /model/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a TraceID type in ptrace?

Yes, they have TraceID type defined in the pcommon pkg.

we want to do a clean break from legacy /model/

I see, I'll keep that in mind!


// GetServices returns all service names known to the backend from spans
// within its retention period.
GetServices(ctx context.Context) ([]string, error)

// GetOperations returns all operation names for a given service
// known to the backend from spans within its retention period.
GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error)

// FindTraces returns all traces matching query parameters. There's currently
// an implementation-dependent abiguity whether all query filters (such as
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// multiple tags) must apply to the same span within a trace, or can be satisfied
// by different spans.
//
// If no matching traces are found, the function returns (nil, nil).
FindTraces(ctx context.Context, query *TraceQueryParameters) ([]*model.Trace, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, this one is tricky. We want the result to be ptrace model, but in ptrace there is not clear separation of spans from different traces, instead the separation is by resource/scope, which are not very relevant on retrieval (usually by then they are de-normalized onto individual spans in storage, although some storage backends can theoretically store fully normalized model). Plus, in order to reproduce the grouping of spans into resource/scope hierarchy we need to do relatively expensive comparison (fwiw the query service does this today for Process deduping, even though the UI does not make use of such normalization).

I believe in my RFC I proposed returning []*ptrace.TraceData from these. The deduping could be an optional step/enrichment in the query-service, rather than forcing each storage backend do that.


// FindTraceIDs does the same search as FindTraces, but returns only the list
// of matching trace IDs.
//
// If no matching traces are found, the function returns (nil, nil).
FindTraceIDs(ctx context.Context, query *TraceQueryParameters) ([]model.TraceID, error)
}

// TraceQueryParameters contains parameters of a trace query.
type TraceQueryParameters struct {
ServiceName string
OperationName string
Tags map[string]string
StartTimeMin time.Time
StartTimeMax time.Time
DurationMin time.Duration
DurationMax time.Duration
NumTraces int
}

// OperationQueryParameters contains parameters of query operations, empty spanKind means get operations for all kinds of span.
type OperationQueryParameters struct {
ServiceName string
SpanKind string
}

// Operation contains operation name and span kind
type Operation struct {
Name string
SpanKind string
}
17 changes: 17 additions & 0 deletions storage_v2/spanstore/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

import (
"context"

"go.opentelemetry.io/collector/pdata/ptrace"
)

// Writer writes spans to storage.
type Writer interface {
// WriteTrace writes batches of spans at once and
// compatible with OTLP Exporter API.
WriteTraces(ctx context.Context, td ptrace.Traces) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// WriteTrace writes batches of spans at once and
// compatible with OTLP Exporter API.
WriteTraces(ctx context.Context, td ptrace.Traces) error
// WriteTrace writes a batch of spans to storage. Idempotent.
// Implementations are not required to support atomic transactions,
// so if any of the spans fail to be written an error is returned.
// Compatible with OTLP Exporter API.
WriteTraces(ctx context.Context, td ptrace.Traces) error

yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
Loading