From 12ac85aaa42a3ea1b20dd6d49acf87715c4ef404 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Wed, 23 Aug 2023 11:20:14 +0200 Subject: [PATCH 1/2] Add trace datastore --- go.mod | 5 + go.sum | 15 ++- trace/trace.go | 228 ++++++++++++++++++++++++++++++++++++++++++++ trace/trace_test.go | 14 +++ 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 trace/trace.go create mode 100644 trace/trace_test.go diff --git a/go.mod b/go.mod index 2fdc6e0..9b6baf4 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,19 @@ require ( github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8 github.com/jbenet/goprocess v0.1.4 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/multierr v1.5.0 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 ) require ( + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/kr/pretty v0.2.0 // indirect github.com/kr/text v0.1.0 // indirect + go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/atomic v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index a25a102..7db547d 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -24,8 +30,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -55,5 +67,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 0000000..d0ce6ad --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,228 @@ +// Package trace wraps a datastore where all datastore interactions are traced +// with open telemetry. +package trace + +import ( + "context" + "fmt" + "io" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + "go.opentelemetry.io/otel/attribute" + otel "go.opentelemetry.io/otel/trace" +) + +// New returns a new traced datastore. All datastore interactions are traced. +func New(ds ds.Datastore, tracer otel.Tracer) *Datastore { + return &Datastore{ds: ds, tracer: tracer} +} + +// Datastore is an adapter that traces inner datastore interactions. +type Datastore struct { + ds ds.Datastore + tracer otel.Tracer +} + +var ( + _ ds.Datastore = (*Datastore)(nil) + _ ds.Batching = (*Datastore)(nil) + _ ds.PersistentDatastore = (*Datastore)(nil) + _ ds.TxnDatastore = (*Datastore)(nil) + _ ds.CheckedDatastore = (*Datastore)(nil) + _ ds.ScrubbedDatastore = (*Datastore)(nil) + _ ds.GCDatastore = (*Datastore)(nil) + _ io.Closer = (*Datastore)(nil) +) + +// Put implements the ds.Datastore interface. +func (t *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { + ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.Put(ctx, key, value) +} + +// Sync implements Datastore.Sync +func (t *Datastore) Sync(ctx context.Context, key ds.Key) error { + ctx, span := t.tracer.Start(ctx, "Sync", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.Sync(ctx, key) +} + +// Get implements the ds.Datastore interface. +func (t *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.Get(ctx, key) +} + +// Has implements the ds.Datastore interface. +func (t *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.Has(ctx, key) +} + +// GetSize implements the ds.Datastore interface. +func (t *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.GetSize(ctx, key) +} + +// Delete implements the ds.Datastore interface. +func (t *Datastore) Delete(ctx context.Context, key ds.Key) (err error) { + ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.ds.Delete(ctx, key) +} + +// Query implements the ds.Datastore interface. +func (t *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) + defer span.End() + return t.ds.Query(ctx, q) +} + +// Batch implements the ds.Batching interface. +func (t *Datastore) Batch(ctx context.Context) (ds.Batch, error) { + ctx, span := t.tracer.Start(ctx, "Batch") + defer span.End() + + if batch, ok := t.ds.(ds.Batching); ok { + return batch.Batch(ctx) + } + + return ds.NewBasicBatch(t), nil +} + +// DiskUsage implements the ds.PersistentDatastore interface. +func (t *Datastore) DiskUsage(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DiskUsage") + defer span.End() + return ds.DiskUsage(ctx, t.ds) +} + +// Scrub implements the ds.ScrubbedDatastore interface. +func (t *Datastore) Scrub(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Scrub") + defer span.End() + + if dstore, ok := t.tracer.(ds.ScrubbedDatastore); ok { + return dstore.Scrub(ctx) + } + + return nil +} + +// CollectGarbage implements the ds.GCDatastore interface. +func (t *Datastore) CollectGarbage(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "CollectGarbage") + defer span.End() + + if dstore, ok := t.tracer.(ds.GCDatastore); ok { + return dstore.CollectGarbage(ctx) + } + + return nil +} + +// Check implements the ds.CheckedDatastore interface. +func (t *Datastore) Check(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Check") + defer span.End() + + if dstore, ok := t.tracer.(ds.CheckedDatastore); ok { + return dstore.Check(ctx) + } + + return nil +} + +// NewTransaction implements the ds.TxnDatastore interface. +func (t *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) { + ctx, span := t.tracer.Start(ctx, "NewTransaction", otel.WithAttributes(attribute.Bool("readOnly", readOnly))) + defer span.End() + + if txnDs, ok := t.ds.(ds.TxnDatastore); ok { + txn, err := txnDs.NewTransaction(ctx, readOnly) + if err != nil { + return nil, err + } + return &Txn{txn: txn, tracer: t.tracer}, nil + } + + return nil, fmt.Errorf("transactions are unsupported by traced datastore") +} + +// Close closes the inner datastore (if it implements the io.Closer interface). +func (t *Datastore) Close() error { + if closer, ok := t.ds.(io.Closer); ok { + return closer.Close() + } + return nil +} + +// Txn is an adapter that traces datastore transactions +type Txn struct { + txn ds.Txn + tracer otel.Tracer +} + +var _ ds.Txn = (*Txn)(nil) + +// Put implements the ds.Txn interface. +func (t *Txn) Put(ctx context.Context, key ds.Key, value []byte) (err error) { + ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.txn.Put(ctx, key, value) +} + +// Get implements the ds.Txn interface. +func (t *Txn) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.txn.Get(ctx, key) +} + +// Has implements the ds.Txn interface. +func (t *Txn) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.txn.Has(ctx, key) +} + +// GetSize implements the ds.Txn interface. +func (t *Txn) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.txn.GetSize(ctx, key) +} + +// Delete implements the ds.Txn interface. +func (t *Txn) Delete(ctx context.Context, key ds.Key) (err error) { + ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + return t.txn.Delete(ctx, key) +} + +// Query implements the ds.Txn interface. +func (t *Txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) + defer span.End() + return t.txn.Query(ctx, q) +} + +// Commit implements the ds.Txn interface. +func (t *Txn) Commit(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Commit") + defer span.End() + return t.txn.Commit(ctx) +} + +// Discard implements the ds.Txn interface. +func (t *Txn) Discard(ctx context.Context) { + ctx, span := t.tracer.Start(ctx, "Discard") + defer span.End() + t.txn.Discard(ctx) +} diff --git a/trace/trace_test.go b/trace/trace_test.go new file mode 100644 index 0000000..3ff9619 --- /dev/null +++ b/trace/trace_test.go @@ -0,0 +1,14 @@ +package trace + +import ( + "testing" + + "github.com/ipfs/go-datastore" + dstest "github.com/ipfs/go-datastore/test" + "go.opentelemetry.io/otel" +) + +func TestTraceAll(t *testing.T) { + tracer := otel.Tracer("tracer") + dstest.SubtestAll(t, New(datastore.NewMapDatastore(), tracer)) +} From 57fb4d9f2abec62dd043403130222d852ddb2a29 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 1 Sep 2023 10:37:22 +0200 Subject: [PATCH 2/2] Add error recording and status code setting --- trace/trace.go | 184 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 156 insertions(+), 28 deletions(-) diff --git a/trace/trace.go b/trace/trace.go index d0ce6ad..b892f7e 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -10,6 +10,7 @@ import ( ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" otel "go.opentelemetry.io/otel/trace" ) @@ -36,52 +37,101 @@ var ( ) // Put implements the ds.Datastore interface. -func (t *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { +func (t *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error { ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.Put(ctx, key, value) + + err := t.ds.Put(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Sync implements Datastore.Sync func (t *Datastore) Sync(ctx context.Context, key ds.Key) error { ctx, span := t.tracer.Start(ctx, "Sync", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.Sync(ctx, key) + + err := t.ds.Sync(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Get implements the ds.Datastore interface. func (t *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.Get(ctx, key) + + val, err := t.ds.Get(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return val, err } // Has implements the ds.Datastore interface. -func (t *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { +func (t *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) { ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.Has(ctx, key) + + exists, err := t.ds.Has(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return exists, err } // GetSize implements the ds.Datastore interface. -func (t *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { +func (t *Datastore) GetSize(ctx context.Context, key ds.Key) (int, error) { ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.GetSize(ctx, key) + + size, err := t.ds.GetSize(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return size, err } // Delete implements the ds.Datastore interface. -func (t *Datastore) Delete(ctx context.Context, key ds.Key) (err error) { +func (t *Datastore) Delete(ctx context.Context, key ds.Key) error { ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.ds.Delete(ctx, key) + + err := t.ds.Delete(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Query implements the ds.Datastore interface. func (t *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) defer span.End() - return t.ds.Query(ctx, q) + + res, err := t.ds.Query(ctx, q) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return res, err } // Batch implements the ds.Batching interface. @@ -89,8 +139,13 @@ func (t *Datastore) Batch(ctx context.Context) (ds.Batch, error) { ctx, span := t.tracer.Start(ctx, "Batch") defer span.End() - if batch, ok := t.ds.(ds.Batching); ok { - return batch.Batch(ctx) + if dstore, ok := t.ds.(ds.Batching); ok { + batch, err := dstore.Batch(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return batch, err } return ds.NewBasicBatch(t), nil @@ -100,7 +155,14 @@ func (t *Datastore) Batch(ctx context.Context) (ds.Batch, error) { func (t *Datastore) DiskUsage(ctx context.Context) (uint64, error) { ctx, span := t.tracer.Start(ctx, "DiskUsage") defer span.End() - return ds.DiskUsage(ctx, t.ds) + + usage, err := ds.DiskUsage(ctx, t.ds) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return usage, err } // Scrub implements the ds.ScrubbedDatastore interface. @@ -109,7 +171,12 @@ func (t *Datastore) Scrub(ctx context.Context) error { defer span.End() if dstore, ok := t.tracer.(ds.ScrubbedDatastore); ok { - return dstore.Scrub(ctx) + err := dstore.Scrub(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err } return nil @@ -121,7 +188,12 @@ func (t *Datastore) CollectGarbage(ctx context.Context) error { defer span.End() if dstore, ok := t.tracer.(ds.GCDatastore); ok { - return dstore.CollectGarbage(ctx) + err := dstore.CollectGarbage(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err } return nil @@ -133,7 +205,12 @@ func (t *Datastore) Check(ctx context.Context) error { defer span.End() if dstore, ok := t.tracer.(ds.CheckedDatastore); ok { - return dstore.Check(ctx) + err := dstore.Check(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err } return nil @@ -147,6 +224,8 @@ func (t *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, if txnDs, ok := t.ds.(ds.TxnDatastore); ok { txn, err := txnDs.NewTransaction(ctx, readOnly) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } return &Txn{txn: txn, tracer: t.tracer}, nil @@ -172,52 +251,101 @@ type Txn struct { var _ ds.Txn = (*Txn)(nil) // Put implements the ds.Txn interface. -func (t *Txn) Put(ctx context.Context, key ds.Key, value []byte) (err error) { +func (t *Txn) Put(ctx context.Context, key ds.Key, value []byte) error { ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.txn.Put(ctx, key, value) + + err := t.txn.Put(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Get implements the ds.Txn interface. func (t *Txn) Get(ctx context.Context, key ds.Key) (value []byte, err error) { ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.txn.Get(ctx, key) + + val, err := t.txn.Get(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return val, err } // Has implements the ds.Txn interface. -func (t *Txn) Has(ctx context.Context, key ds.Key) (exists bool, err error) { +func (t *Txn) Has(ctx context.Context, key ds.Key) (bool, error) { ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.txn.Has(ctx, key) + + exists, err := t.txn.Has(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return exists, err } // GetSize implements the ds.Txn interface. -func (t *Txn) GetSize(ctx context.Context, key ds.Key) (size int, err error) { +func (t *Txn) GetSize(ctx context.Context, key ds.Key) (int, error) { ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.txn.GetSize(ctx, key) + + size, err := t.txn.GetSize(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return size, err } // Delete implements the ds.Txn interface. -func (t *Txn) Delete(ctx context.Context, key ds.Key) (err error) { +func (t *Txn) Delete(ctx context.Context, key ds.Key) error { ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) defer span.End() - return t.txn.Delete(ctx, key) + + err := t.txn.Delete(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Query implements the ds.Txn interface. func (t *Txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) defer span.End() - return t.txn.Query(ctx, q) + + res, err := t.txn.Query(ctx, q) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return res, err } // Commit implements the ds.Txn interface. func (t *Txn) Commit(ctx context.Context) error { ctx, span := t.tracer.Start(ctx, "Commit") defer span.End() - return t.txn.Commit(ctx) + + err := t.txn.Commit(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err } // Discard implements the ds.Txn interface.