Skip to content

Commit

Permalink
[v2] Add v1 factory converter to v2 storage factory (#5497)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #5334

## Description of the changes
- Current storageextension uses v1 storage factory and transitioning
from v1 to v2 requires v2 to implement the v1 storage factory interface.
This PR helps by creating a wrapper factory to implement v1 storage
factory converted from v2 spanstore interface.

## How was this change tested?
- unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <james.ryans2012@gmail.com>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
james-ryans and yurishkuro authored Jun 19, 2024
1 parent 1c1f22b commit 3f6f5fe
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 42 deletions.
31 changes: 8 additions & 23 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ package storageexporter

import (
"context"
"errors"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type storageExporter struct {
config *Config
logger *zap.Logger
spanWriter spanstore.Writer
config *Config
logger *zap.Logger
traceWriter spanstore.Writer
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -31,13 +29,13 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(exp.config.TraceStorage, host)
f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
if exp.traceWriter, err = f.CreateTraceWriter(); err != nil {
return fmt.Errorf("cannot create trace writer: %w", err)
}

return nil
Expand All @@ -49,18 +47,5 @@ func (*storageExporter) close(_ context.Context) error {
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)
}
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process
}
errs = append(errs, exp.spanWriter.WriteSpan(ctx, span))
}
}
return errors.Join(errs...)
return exp.traceWriter.WriteTraces(ctx, td)
}
64 changes: 45 additions & 19 deletions cmd/jaeger/internal/exporters/storageexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package storageexporter

import (
"context"
"errors"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -32,29 +34,28 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/model"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
)

type storageHost struct {
t *testing.T
storageExtension component.Component
type mockStorageExt struct {
name string
factory *factoryMocks.Factory
}

func (host storageHost) GetExtensions() map[component.ID]component.Component {
myMap := make(map[component.ID]component.Component)
myMap[jaegerstorage.ID] = host.storageExtension
return myMap
func (*mockStorageExt) Start(context.Context, component.Host) error {
panic("not implemented")
}

func (host storageHost) ReportFatalError(err error) {
host.t.Fatal(err)
func (*mockStorageExt) Shutdown(context.Context) error {
panic("not implemented")
}

func (storageHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
}

func (storageHost) GetExporters() map[component.DataType]map[component.ID]component.Component {
return nil
func (m *mockStorageExt) Factory(name string) (storage.Factory, bool) {
if m.name == name {
return m.factory, true
}
return nil, false
}

func TestExporterConfigError(t *testing.T) {
Expand All @@ -63,8 +64,10 @@ func TestExporterConfigError(t *testing.T) {
require.EqualError(t, err, "TraceStorage: non zero value required")
}

func TestExporterStartError(t *testing.T) {
host := makeStorageExtension(t, "foo")
func TestExporterStartBadNameError(t *testing.T) {
host := storagetest.NewStorageHost()
host.WithExtension(jaegerstorage.ID, &mockStorageExt{name: "foo"})

exporter := &storageExporter{
config: &Config{
TraceStorage: "bar",
Expand All @@ -75,6 +78,26 @@ func TestExporterStartError(t *testing.T) {
require.ErrorContains(t, err, "cannot find storage factory")
}

func TestExporterStartBadSpanstoreError(t *testing.T) {
factory := new(factoryMocks.Factory)
factory.On("CreateSpanWriter").Return(nil, errors.New("mocked error"))

host := storagetest.NewStorageHost()
host.WithExtension(jaegerstorage.ID, &mockStorageExt{
name: "foo",
factory: factory,
})

exporter := &storageExporter{
config: &Config{
TraceStorage: "foo",
},
}
err := exporter.start(context.Background(), host)
require.Error(t, err)
require.ErrorContains(t, err, "mocked error")
}

func TestExporter(t *testing.T) {
exporterFactory := NewFactory()

Expand Down Expand Up @@ -133,7 +156,7 @@ func TestExporter(t *testing.T) {
assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String())
}

func makeStorageExtension(t *testing.T, memstoreName string) storageHost {
func makeStorageExtension(t *testing.T, memstoreName string) component.Host {
extensionFactory := jaegerstorage.NewFactory()
storageExtension, err := extensionFactory.CreateExtension(
context.Background(),
Expand All @@ -147,10 +170,13 @@ func makeStorageExtension(t *testing.T, memstoreName string) storageHost {
memstoreName: {MaxTraces: 10000},
}})
require.NoError(t, err)
host := storageHost{t: t, storageExtension: storageExtension}

host := storagetest.NewStorageHost()
host.WithExtension(jaegerstorage.ID, storageExtension)

err = storageExtension.Start(context.Background(), host)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) })

return host
}
11 changes: 11 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage/factoryadapter"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

var _ Extension = (*storageExt)(nil)
Expand Down Expand Up @@ -62,6 +64,15 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error
return f, nil
}

func GetStorageFactoryV2(name string, host component.Host) (spanstore.Factory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
return nil, err
}

return factoryadapter.NewFactory(f), nil
}

func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt {
return &storageExt{
config: config,
Expand Down
10 changes: 10 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,22 @@ func TestStorageFactoryBadShutdownError(t *testing.T) {
require.ErrorIs(t, err, shutdownError)
}

func TestStorageFactoryV2Error(t *testing.T) {
host := componenttest.NewNopHost()
_, err := GetStorageFactoryV2("something", host)
require.ErrorContains(t, err, "cannot find extension")
}

func TestStorageExtension(t *testing.T) {
const name = "foo"
host := storageHost{t: t, storageExtension: startStorageExtension(t, name)}
f, err := GetStorageFactory(name, host)
require.NoError(t, err)
require.NotNil(t, f)

f2, err := GetStorageFactoryV2(name, host)
require.NoError(t, err)
require.NotNil(t, f2)
}

func TestBadgerStorageExtension(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Storage Factory Converter

A temporary v1 storage factory wrapper to implement v2 storage APIs.
This way, the existing v1 storage factories declared in `jaegerstorageextension`
can act as v2 storage while we migrate to v2 storage APIs.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package factoryadapter

import (
"context"
"io"

storage_v1 "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type Factory struct {
ss storage_v1.Factory
}

func NewFactory(ss storage_v1.Factory) spanstore.Factory {
return &Factory{
ss: ss,
}
}

// Initialize implements spanstore.Factory.
func (*Factory) Initialize(_ context.Context) error {
panic("not implemented")
}

// Close implements spanstore.Factory.
func (f *Factory) Close(_ context.Context) error {
if closer, ok := f.ss.(io.Closer); ok {
return closer.Close()
}
return nil
}

// CreateTraceReader implements spanstore.Factory.
func (*Factory) CreateTraceReader() (spanstore.Reader, error) {
panic("not implemented")
}

// CreateTraceWriter implements spanstore.Factory.
func (f *Factory) CreateTraceWriter() (spanstore.Writer, error) {
spanWriter, err := f.ss.CreateSpanWriter()
if err != nil {
return nil, err
}
return NewTraceWriter(spanWriter), nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package factoryadapter

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
spanstoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestAdapterInitialize(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("initialize did not panic")
}
}()

f := &Factory{}
_ = f.Initialize(context.Background())
}

func TestAdapterCloseNotOk(t *testing.T) {
f := NewFactory(&factoryMocks.Factory{})
require.NoError(t, f.Close(context.Background()))
}

func TestAdapterClose(t *testing.T) {
f := NewFactory(grpc.NewFactory())
require.NoError(t, f.Close(context.Background()))
}

func TestAdapterCreateTraceReader(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("create trace reader did not panic")
}
}()

f := &Factory{}
f.CreateTraceReader()
}

func TestAdapterCreateTraceWriterError(t *testing.T) {
f1 := new(factoryMocks.Factory)
f1.On("CreateSpanWriter").Return(nil, errors.New("mock error"))

f := NewFactory(f1)
_, err := f.CreateTraceWriter()
require.ErrorContains(t, err, "mock error")
}

func TestAdapterCreateTraceWriter(t *testing.T) {
f1 := new(factoryMocks.Factory)
f1.On("CreateSpanWriter").Return(new(spanstoreMocks.Writer), nil)

f := NewFactory(f1)
_, err := f.CreateTraceWriter()
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package factoryadapter

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
Loading

0 comments on commit 3f6f5fe

Please sign in to comment.