From ee85454b21dd7b9e01a6fcc9575b2eefeaf51983 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 28 Mar 2023 21:52:36 +0200 Subject: [PATCH] feat: add tracing to the commands client --- cmd/ipfs/main.go | 59 ++++++++++++++++++++++++++++----------- core/corehttp/commands.go | 12 ++++---- core/corehttp/gateway.go | 2 ++ tracing/tracing.go | 9 ++++-- 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 8b9822c99ac3..c3b3f3eaa0dd 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -10,8 +10,15 @@ import ( "net/http" "os" "runtime/pprof" + "strings" "time" + "github.com/google/uuid" + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/go-ipfs-cmds/cli" + cmdhttp "github.com/ipfs/go-ipfs-cmds/http" + u "github.com/ipfs/go-ipfs-util" + logging "github.com/ipfs/go-log" "github.com/ipfs/kubo/cmd/ipfs/util" oldcmds "github.com/ipfs/kubo/commands" "github.com/ipfs/kubo/core" @@ -21,22 +28,19 @@ import ( "github.com/ipfs/kubo/repo" "github.com/ipfs/kubo/repo/fsrepo" "github.com/ipfs/kubo/tracing" - - cmds "github.com/ipfs/go-ipfs-cmds" - "github.com/ipfs/go-ipfs-cmds/cli" - cmdhttp "github.com/ipfs/go-ipfs-cmds/http" - u "github.com/ipfs/go-ipfs-util" - logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" manet "github.com/multiformats/go-multiaddr/net" - - "github.com/google/uuid" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // log is the command logger var log = logging.Logger("cmd/ipfs") +var tracer trace.Tracer // declared as a var for testing purposes var dnsResolver = madns.DefaultResolver @@ -91,7 +95,6 @@ func newUUID(key string) logging.Metadata { func mainRet() (exitCode int) { rand.Seed(time.Now().UnixNano()) ctx := logging.ContextWithLoggable(context.Background(), newUUID("session")) - var err error tp, err := tracing.NewTracerProvider(ctx) if err != nil { @@ -103,6 +106,7 @@ func mainRet() (exitCode int) { } }() otel.SetTracerProvider(tp) + tracer = tp.Tracer("Kubo-cli") stopFunc, err := profileIfEnabled() if err != nil { @@ -219,7 +223,7 @@ func apiAddrOption(req *cmds.Request) (ma.Multiaddr, error) { } func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) { - exe := cmds.NewExecutor(req.Root) + exe := tracingWrappedExecutor{cmds.NewExecutor(req.Root)} cctx := env.(*oldcmds.Context) // Check if the command is disabled. @@ -294,23 +298,44 @@ func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) { opts = append(opts, cmdhttp.ClientWithFallback(exe)) } + var tpt http.RoundTripper switch network { case "tcp", "tcp4", "tcp6": + tpt = http.DefaultTransport case "unix": path := host host = "unix" - opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", path) - }, + tpt = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", path) }, - })) + } default: return nil, fmt.Errorf("unsupported API address: %s", apiAddr) } + opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{ + Transport: otelhttp.NewTransport(tpt, + otelhttp.WithPropagators(tracing.Propagator()), + ), + })) + + return tracingWrappedExecutor{cmdhttp.NewClient(host, opts...)}, nil +} - return cmdhttp.NewClient(host, opts...), nil +type tracingWrappedExecutor struct { + exec cmds.Executor +} + +func (twe tracingWrappedExecutor) Execute(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error { + ctx, span := tracer.Start(req.Context, "cmds."+strings.Join(req.Path, "."), trace.WithAttributes(attribute.StringSlice("Arguments", req.Arguments))) + defer span.End() + req.Context = ctx + + err := twe.exec.Execute(req, re, env) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + return err } func getRepoPath(req *cmds.Request) (string, error) { diff --git a/core/corehttp/commands.go b/core/corehttp/commands.go index c0ebf5506163..5d839939320e 100644 --- a/core/corehttp/commands.go +++ b/core/corehttp/commands.go @@ -9,15 +9,16 @@ import ( "strconv" "strings" - version "github.com/ipfs/kubo" - oldcmds "github.com/ipfs/kubo/commands" - "github.com/ipfs/kubo/core" - corecommands "github.com/ipfs/kubo/core/commands" - cmds "github.com/ipfs/go-ipfs-cmds" cmdsHttp "github.com/ipfs/go-ipfs-cmds/http" path "github.com/ipfs/go-path" + version "github.com/ipfs/kubo" + oldcmds "github.com/ipfs/kubo/commands" config "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/core" + corecommands "github.com/ipfs/kubo/core/commands" + "github.com/ipfs/kubo/tracing" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) var ( @@ -146,6 +147,7 @@ func commandsOption(cctx oldcmds.Context, command *cmds.Command, allowGet bool) patchCORSVars(cfg, l.Addr()) cmdHandler := cmdsHttp.NewHandler(&cctx, command, cfg) + cmdHandler = otelhttp.NewHandler(cmdHandler, "corehttp.cmdsHandler", otelhttp.WithPropagators(tracing.Propagator())) mux.Handle(APIPath+"/", cmdHandler) return mux, nil } diff --git a/core/corehttp/gateway.go b/core/corehttp/gateway.go index f77e941d9eed..b68aa7bda64e 100644 --- a/core/corehttp/gateway.go +++ b/core/corehttp/gateway.go @@ -48,6 +48,8 @@ func GatewayOption(paths ...string) ServeOption { } gw := gateway.NewHandler(gwConfig, gwAPI) + // TODO: Add otelhttp.WithPropagators(tracing.Propagator()) option to + // propagate traces through the gateway once we test this feature. gw = otelhttp.NewHandler(gw, "Gateway.Request") // By default, our HTTP handler is the gateway handler. diff --git a/tracing/tracing.go b/tracing/tracing.go index 6cc666a83769..1da3fbd53e39 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/exporters/zipkin" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" @@ -130,7 +131,7 @@ func NewTracerProvider(ctx context.Context) (shutdownTracerProvider, error) { r, err := resource.Merge( resource.Default(), resource.NewSchemaless( - semconv.ServiceNameKey.String("go-ipfs"), + semconv.ServiceNameKey.String("Kubo"), semconv.ServiceVersionKey.String(version.CurrentVersionNumber), ), ) @@ -144,5 +145,9 @@ func NewTracerProvider(ctx context.Context) (shutdownTracerProvider, error) { // Span starts a new span using the standard IPFS tracing conventions. func Span(ctx context.Context, componentName string, spanName string, opts ...traceapi.SpanStartOption) (context.Context, traceapi.Span) { - return otel.Tracer("go-ipfs").Start(ctx, fmt.Sprintf("%s.%s", componentName, spanName), opts...) + return otel.Tracer("Kubo").Start(ctx, fmt.Sprintf("%s.%s", componentName, spanName), opts...) +} + +func Propagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) }