From 03848cd0aec6dfac2d6759efecc742a5645f3013 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 15:27:28 -0400 Subject: [PATCH 01/13] basic integration --- component/config.go | 54 +++++++++++++++----- component/config_test.go | 2 +- component/identifiable.go | 12 +++-- exporter/exporterhelper/batch_sender_test.go | 16 +++--- exporter/exporterhelper/common_test.go | 13 ++--- exporter/exporterhelper/queue_sender_test.go | 18 +++---- exporter/exporterhelper/retry_sender_test.go | 12 ++--- otelcol/otelcoltest/config_test.go | 2 +- service/internal/graph/graph.go | 10 ++-- service/pipelines/config.go | 9 +++- 10 files changed, 93 insertions(+), 55 deletions(-) diff --git a/component/config.go b/component/config.go index 49cc7f5219f..0b05f5a504b 100644 --- a/component/config.go +++ b/component/config.go @@ -109,18 +109,23 @@ func callValidateIfPossible(v reflect.Value) error { return nil } +type Type interface { + String() string + MarshalText() ([]byte, error) +} + // Type is the component type as it is used in the config. -type Type struct { +type ComponentType struct { name string } // String returns the string representation of the type. -func (t Type) String() string { +func (t ComponentType) String() string { return t.name } // MarshalText marshals returns the Type name. -func (t Type) MarshalText() ([]byte, error) { +func (t ComponentType) MarshalText() ([]byte, error) { return []byte(t.name), nil } @@ -137,12 +142,12 @@ var typeRegexp = regexp.MustCompile(`^[a-zA-Z][0-9a-zA-Z_]{0,62}$`) // - can only contain ASCII alphanumeric characters and '_'. func NewType(ty string) (Type, error) { if len(ty) == 0 { - return Type{}, fmt.Errorf("id must not be empty") + return ComponentType{}, fmt.Errorf("id must not be empty") } if !typeRegexp.MatchString(ty) { - return Type{}, fmt.Errorf("invalid character(s) in type %q", ty) + return ComponentType{}, fmt.Errorf("invalid character(s) in type %q", ty) } - return Type{name: ty}, nil + return ComponentType{name: ty}, nil } // MustNewType creates a type. It panics if the type is invalid. @@ -151,6 +156,7 @@ func NewType(ty string) (Type, error) { // - start with an ASCII alphabetic character and // - can only contain ASCII alphanumeric characters and '_'. func MustNewType(strType string) Type { + // todo ban the signal types from here OR return the data type ty, err := NewType(strType) if err != nil { panic(err) @@ -160,20 +166,40 @@ func MustNewType(strType string) Type { // DataType is a special Type that represents the data types supported by the collector. We currently support // collecting metrics, traces and logs, this can expand in the future. -type DataType = Type - -func mustNewDataType(strType string) DataType { - return MustNewType(strType) -} +type DataType string // Currently supported data types. Add new data types here when new types are supported in the future. var ( // DataTypeTraces is the data type tag for traces. - DataTypeTraces = mustNewDataType("traces") + DataTypeTraces DataType = "traces" // DataTypeMetrics is the data type tag for metrics. - DataTypeMetrics = mustNewDataType("metrics") + DataTypeMetrics DataType = "metrics" // DataTypeLogs is the data type tag for logs. - DataTypeLogs = mustNewDataType("logs") + DataTypeLogs DataType = "logs" + + signalNameToDataType = map[string]DataType{ + "logs": DataTypeLogs, + "metrics": DataTypeMetrics, + "traces": DataTypeTraces, + } ) + +func (dt DataType) String() string { + return string(dt) +} +func (dt DataType) MarshalText() (text []byte, err error) { + return []byte(dt), nil +} + +// DataTypeFromSignal takes in a string of a datatype, and returns a DataType and a bool. +// The bool is set to true if the string corresponds to a supported DataType, else it is False. +// if there is a matching DataType, it returns the matching DataType enum. If not, it returns an empty string. +func DataTypeFromSignal(s string) (DataType, bool) { + val, ok := signalNameToDataType[s] + if !ok { + return "", ok + } + return val, ok +} diff --git a/component/config_test.go b/component/config_test.go index caed4dc20ab..1b6ff8bcff0 100644 --- a/component/config_test.go +++ b/component/config_test.go @@ -16,7 +16,7 @@ import ( "go.opentelemetry.io/collector/confmap" ) -var _ fmt.Stringer = Type{} +var _ fmt.Stringer = ComponentType{} type configChildStruct struct { Child errConfig diff --git a/component/identifiable.go b/component/identifiable.go index d2d65a5e24f..8b12a307970 100644 --- a/component/identifiable.go +++ b/component/identifiable.go @@ -83,11 +83,15 @@ func (id *ID) UnmarshalText(text []byte) error { return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator) } } - - var err error - if id.typeVal, err = NewType(typeStr); err != nil { - return fmt.Errorf("in %q id: %w", idStr, err) + if dataType, ok := DataTypeFromSignal(typeStr); ok { + id.typeVal = dataType + } else { + var err error + if id.typeVal, err = NewType(typeStr); err != nil { + return fmt.Errorf("in %q id: %w", idStr, err) + } } + id.nameVal = nameStr return nil diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 15d05b066f4..bc5720a99f8 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -209,7 +209,7 @@ func TestBatchSender_Disabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false cfg.MaxSizeItems = 5 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -257,7 +257,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { } func TestBatchSender_PostShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) @@ -275,7 +275,7 @@ func TestBatchSender_PostShutdown(t *testing.T) { func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) require.NotNil(t, be) @@ -299,7 +299,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -329,7 +329,7 @@ func TestBatchSender_BatchBlocking(t *testing.T) { func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestBatchSender_BatchCancelled(t *testing.T) { func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -427,7 +427,7 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, tt.opts...) + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) if tt.expectedErr { assert.Nil(t, be) assert.Error(t, err) @@ -440,7 +440,7 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, batchOption, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) require.NotNil(t, be) require.NoError(t, err) diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index e134affeaaa..d79b7c07918 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -24,6 +24,7 @@ import ( var ( defaultType = component.MustNewType("test") + defaultDataType = component.DataTypeMetrics defaultID = component.NewID(defaultType) defaultSettings = func() exporter.CreateSettings { set := exportertest.NewNopCreateSettings() @@ -37,7 +38,7 @@ func newNoopObsrepSender(*ObsReport) requestSender { } func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender) + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -46,7 +47,7 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, defaultType, newNoopObsrepSender, + defaultSettings, defaultDataType, newNoopObsrepSender, WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings()), @@ -66,16 +67,16 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { } func TestQueueOptionsWithRequestExporter(t *testing.T) { - bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig())) require.Nil(t, err) require.Nil(t, bs.marshaler) require.Nil(t, bs.unmarshaler) - _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings())) require.Error(t, err) - _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(configretry.NewDefaultBackOffConfig()), WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) @@ -88,7 +89,7 @@ func TestBaseExporterLogging(t *testing.T) { set.Logger = zap.New(logger) rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg)) + bs, err := newBaseExporter(set, defaultDataType, newNoopObsrepSender, WithRetry(rCfg)) require.Nil(t, err) sendErr := bs.send(context.Background(), newErrorRequest()) require.Error(t, sendErr) diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index a2616152ea6..4775938eb55 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -27,7 +27,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -61,7 +61,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -93,7 +93,7 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(set, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithQueue(qCfg)) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, tt.queueOptions...) + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, tt.queueOptions...) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -209,7 +209,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -338,7 +338,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -388,7 +388,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered mockReq := newErrorRequest() - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -412,7 +412,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. replacedReq := newMockRequest(1, nil) - be, err = newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + be, err = newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), host)) diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index d3863097326..ef47597c62e 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -39,7 +39,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -63,7 +63,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), WithQueue(qCfg), WithRetry(rCfg)) require.NoError(t, err) @@ -90,7 +90,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -120,7 +120,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -202,7 +202,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) diff --git a/otelcol/otelcoltest/config_test.go b/otelcol/otelcoltest/config_test.go index 71502de536e..69e1cc40e96 100644 --- a/otelcol/otelcoltest/config_test.go +++ b/otelcol/otelcoltest/config_test.go @@ -55,7 +55,7 @@ func TestLoadConfig(t *testing.T) { Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("nop")}, }, - cfg.Service.Pipelines[component.MustNewID("traces")], + cfg.Service.Pipelines[component.NewIDWithName(component.DataTypeTraces, "")], "Did not load pipeline config correctly") } diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 525c269a6de..d7b9e27e61e 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -132,14 +132,14 @@ func (g *Graph) createNodes(set Settings) error { // The presence of each key indicates how the connector is used as an exporter. // The value is initially set to false. Later we will set the value to true *if* we // confirm that there is a supported corresponding use as a receiver. - expTypes[pipelineID.Type()] = false + expTypes[pipelineID.Type().(component.DataType)] = false } recTypes := make(map[component.DataType]bool) for _, pipelineID := range connectorsAsReceiver[connID] { // The presence of each key indicates how the connector is used as a receiver. // The value is initially set to false. Later we will set the value to true *if* we // confirm that there is a supported corresponding use as an exporter. - recTypes[pipelineID.Type()] = false + recTypes[pipelineID.Type().(component.DataType)] = false } for expType := range expTypes { @@ -182,7 +182,7 @@ func (g *Graph) createNodes(set Settings) error { } func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode { - rcvrNode := newReceiverNode(pipelineID.Type(), recvID) + rcvrNode := newReceiverNode(pipelineID.Type().(component.DataType), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*receiverNode) @@ -212,7 +212,7 @@ func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode } func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { - expNode := newExporterNode(pipelineID.Type(), exprID) + expNode := newExporterNode(pipelineID.Type().(component.DataType), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*exporterNode) @@ -229,7 +229,7 @@ func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { } func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode { - connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID) + connNode := newConnectorNode(exprPipelineID.Type().(component.DataType), rcvrPipelineID.Type().(component.DataType), connID) if node := g.componentGraph.Node(connNode.ID()); node != nil { instanceID := g.instanceIDs[connNode.ID()] instanceID.PipelineIDs[exprPipelineID] = struct{}{} diff --git a/service/pipelines/config.go b/service/pipelines/config.go index ed8c77a31c0..e40f263e2d1 100644 --- a/service/pipelines/config.go +++ b/service/pipelines/config.go @@ -28,7 +28,14 @@ func (cfg Config) Validate() error { // Check that all pipelines have at least one receiver and one exporter, and they reference // only configured components. for pipelineID, pipeline := range cfg { - if pipelineID.Type() != component.DataTypeTraces && pipelineID.Type() != component.DataTypeMetrics && pipelineID.Type() != component.DataTypeLogs { + switch pipelineID.Type().(type) { + case component.DataType: + t := pipelineID.Type().(component.DataType) + // This check should never really be necessary since the parsing logic should never create an unknown DataType - but there are no enums in go so this is probably good practice. + if t != component.DataTypeTraces && t != component.DataTypeMetrics && t != component.DataTypeLogs { + return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Type()) + } + default: return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Type()) } From dcece16e36f0225caec0f96d93a3111ea0e26306 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 19:05:18 -0400 Subject: [PATCH 02/13] test fixes and change Type constructor --- component/config.go | 5 ++++- processor/memorylimiterprocessor/memorylimiter_test.go | 4 +++- service/internal/graph/graph_test.go | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/component/config.go b/component/config.go index 0b05f5a504b..836c499d260 100644 --- a/component/config.go +++ b/component/config.go @@ -114,7 +114,7 @@ type Type interface { MarshalText() ([]byte, error) } -// Type is the component type as it is used in the config. +// ComponentType is the component type as it is used in the config. type ComponentType struct { name string } @@ -147,6 +147,9 @@ func NewType(ty string) (Type, error) { if !typeRegexp.MatchString(ty) { return ComponentType{}, fmt.Errorf("invalid character(s) in type %q", ty) } + if dataType, ok := DataTypeFromSignal(ty); ok { + return dataType, nil + } return ComponentType{name: ty}, nil } diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 10bc8d7911c..0c103157346 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -56,7 +56,9 @@ func TestNoDataLoss(t *testing.T) { limiter, err := newMemoryLimiterProcessor(set, cfg) require.NoError(t, err) - processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.CreateSettings{}, cfg, exporter, + processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.CreateSettings{ + ID: component.MustNewID("nop"), + }, cfg, exporter, limiter.processLogs, processorhelper.WithStart(limiter.start), processorhelper.WithShutdown(limiter.shutdown)) diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index eddd3098153..317ab0fd060 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2399,7 +2399,7 @@ func expectedInstances(m pipelines.Config, pID component.ID) (int, int) { } // This is a connector. Count the pipeline types where it is an exporter. - typeMap := map[component.DataType]bool{} + typeMap := map[component.Type]bool{} for pID, pCfg := range m { for _, eID := range pCfg.Exporters { if eID == rID { @@ -2416,7 +2416,7 @@ func expectedInstances(m pipelines.Config, pID component.ID) (int, int) { } // This is a connector. Count the pipeline types where it is a receiver. - typeMap := map[component.DataType]bool{} + typeMap := map[component.Type]bool{} for pID, pCfg := range m { for _, rID := range pCfg.Receivers { if rID == eID { From 3985797551d335d3aa5a131d39e0f3f2bfe56682 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 19:06:00 -0400 Subject: [PATCH 03/13] delete comment --- component/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/config.go b/component/config.go index 836c499d260..2d8490de15e 100644 --- a/component/config.go +++ b/component/config.go @@ -159,7 +159,6 @@ func NewType(ty string) (Type, error) { // - start with an ASCII alphabetic character and // - can only contain ASCII alphanumeric characters and '_'. func MustNewType(strType string) Type { - // todo ban the signal types from here OR return the data type ty, err := NewType(strType) if err != nil { panic(err) From 78519a792e7e071cb2e1dcb90ba7ada4fb20553d Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 19:12:53 -0400 Subject: [PATCH 04/13] clarify comment --- component/config.go | 1 + component/identifiable.go | 11 ++++------- otelcol/otelcoltest/config_test.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/component/config.go b/component/config.go index 2d8490de15e..f43ad46e5a5 100644 --- a/component/config.go +++ b/component/config.go @@ -140,6 +140,7 @@ var typeRegexp = regexp.MustCompile(`^[a-zA-Z][0-9a-zA-Z_]{0,62}$`) // - have at least one character, // - start with an ASCII alphabetic character and // - can only contain ASCII alphanumeric characters and '_'. +// If the type string is a supported DataType, one is returned. Otherwise, a ComponentType is returned. func NewType(ty string) (Type, error) { if len(ty) == 0 { return ComponentType{}, fmt.Errorf("id must not be empty") diff --git a/component/identifiable.go b/component/identifiable.go index 8b12a307970..842aeda23f7 100644 --- a/component/identifiable.go +++ b/component/identifiable.go @@ -83,13 +83,10 @@ func (id *ID) UnmarshalText(text []byte) error { return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator) } } - if dataType, ok := DataTypeFromSignal(typeStr); ok { - id.typeVal = dataType - } else { - var err error - if id.typeVal, err = NewType(typeStr); err != nil { - return fmt.Errorf("in %q id: %w", idStr, err) - } + + var err error + if id.typeVal, err = NewType(typeStr); err != nil { + return fmt.Errorf("in %q id: %w", idStr, err) } id.nameVal = nameStr diff --git a/otelcol/otelcoltest/config_test.go b/otelcol/otelcoltest/config_test.go index 69e1cc40e96..d52ff5b75ac 100644 --- a/otelcol/otelcoltest/config_test.go +++ b/otelcol/otelcoltest/config_test.go @@ -55,7 +55,7 @@ func TestLoadConfig(t *testing.T) { Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("nop")}, }, - cfg.Service.Pipelines[component.NewIDWithName(component.DataTypeTraces, "")], + cfg.Service.Pipelines[component.NewID(component.DataTypeTraces)], "Did not load pipeline config correctly") } From 0264ff2d5647efeae08d68128c611ee056da8e32 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 20:35:57 -0400 Subject: [PATCH 05/13] disable that lint --- component/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/config.go b/component/config.go index f43ad46e5a5..d87ee9aef23 100644 --- a/component/config.go +++ b/component/config.go @@ -115,7 +115,7 @@ type Type interface { } // ComponentType is the component type as it is used in the config. -type ComponentType struct { +type ComponentType struct { //revive:disable-line:exported name string } From cdd368a2aa392ef6044f596abc0191abac0a6103 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 20:36:32 -0400 Subject: [PATCH 06/13] add a test for DataTypeFromSignal --- component/config_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/component/config_test.go b/component/config_test.go index 1b6ff8bcff0..aa09c960336 100644 --- a/component/config_test.go +++ b/component/config_test.go @@ -447,3 +447,13 @@ func TestStructWithEmbeddedUnmarshaling(t *testing.T) { assert.Equal(t, "foo", tc.String) assert.Equal(t, 123, tc.Num) } + +func TestDataTypeFromSignal(t *testing.T) { + dt, ok := DataTypeFromSignal("logs") + assert.Equal(t, ok, true) + assert.Equal(t, DataTypeLogs, dt) + + dt, ok = DataTypeFromSignal("asdf") + assert.Equal(t, ok, false) + assert.Equal(t, DataType(""), dt) +} From 88fee9c9f076397351973a3317042c40a538ba29 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 20:47:25 -0400 Subject: [PATCH 07/13] embed interfaces --- component/config.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/component/config.go b/component/config.go index d87ee9aef23..ccd94b629bf 100644 --- a/component/config.go +++ b/component/config.go @@ -4,6 +4,7 @@ package component // import "go.opentelemetry.io/collector/component" import ( + "encoding" "fmt" "reflect" "regexp" @@ -110,8 +111,8 @@ func callValidateIfPossible(v reflect.Value) error { } type Type interface { - String() string - MarshalText() ([]byte, error) + fmt.Stringer + encoding.TextMarshaler } // ComponentType is the component type as it is used in the config. From 6a72d0eb559b0b12a149d50fd61315fb8a773f1e Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 21:12:52 -0400 Subject: [PATCH 08/13] tweak pipeline validation logic --- service/pipelines/config.go | 16 +++++++--------- service/pipelines/config_test.go | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/service/pipelines/config.go b/service/pipelines/config.go index e40f263e2d1..4fe16f5b0b0 100644 --- a/service/pipelines/config.go +++ b/service/pipelines/config.go @@ -28,15 +28,13 @@ func (cfg Config) Validate() error { // Check that all pipelines have at least one receiver and one exporter, and they reference // only configured components. for pipelineID, pipeline := range cfg { - switch pipelineID.Type().(type) { - case component.DataType: - t := pipelineID.Type().(component.DataType) - // This check should never really be necessary since the parsing logic should never create an unknown DataType - but there are no enums in go so this is probably good practice. - if t != component.DataTypeTraces && t != component.DataTypeMetrics && t != component.DataTypeLogs { - return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Type()) - } - default: - return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Type()) + dt, ok := pipelineID.Type().(component.DataType) + if !ok { + return fmt.Errorf("pipeline %q: is not a DataType %q", pipelineID, pipelineID.Type()) + } + // This check should never really be necessary since the parsing logic should never create an unknown DataType - but there are no enums in go so this is might be good practice. + if _, ok := component.DataTypeFromSignal(string(dt)); !ok { + return fmt.Errorf("pipeline %q: unknown DataType %q", pipelineID, dt) } // Validate pipeline has at least one receiver. diff --git a/service/pipelines/config_test.go b/service/pipelines/config_test.go index 7ad94d5ab3f..b7b2d38007b 100644 --- a/service/pipelines/config_test.go +++ b/service/pipelines/config_test.go @@ -70,7 +70,20 @@ func TestConfigValidate(t *testing.T) { } return cfg }, - expected: errors.New(`pipeline "wrongtype": unknown datatype "wrongtype"`), + expected: errors.New(`pipeline "wrongtype": is not a DataType "wrongtype"`), + }, + { + name: "unknown-service-pipeline-type", + cfgFn: func() Config { + cfg := generateConfig() + cfg[component.NewIDWithName(component.DataType("asdf"), "")] = &PipelineConfig{ + Receivers: []component.ID{component.MustNewID("nop")}, + Processors: []component.ID{component.MustNewID("nop")}, + Exporters: []component.ID{component.MustNewID("nop")}, + } + return cfg + }, + expected: errors.New(`pipeline "asdf": unknown DataType "asdf"`), }, } From c1f040beb5977a63b377d8735bab3f15351adabf Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 21:25:24 -0400 Subject: [PATCH 09/13] add a test for NewType --- component/config_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/component/config_test.go b/component/config_test.go index aa09c960336..c26c0e59a61 100644 --- a/component/config_test.go +++ b/component/config_test.go @@ -423,6 +423,18 @@ func TestNewType(t *testing.T) { } } +func TestNewTypeDataType(t *testing.T) { + ty, err := NewType("logs") + require.NoError(t, err) + assert.Equal(t, ty, DataTypeLogs) + + // hopefully this reminds us to update DataType when profiles get included + ty, err = NewType("profiles") + require.NoError(t, err) + assert.Equal(t, ty.(ComponentType).name, "profiles") + +} + type configWithEmbeddedStruct struct { String string `mapstructure:"string"` Num int `mapstructure:"num"` From 554abb8cb28ee9d3da33b1b05d6192e8e88f4af9 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 21:34:44 -0400 Subject: [PATCH 10/13] comments --- component/config.go | 3 +++ component/identifiable.go | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/component/config.go b/component/config.go index ccd94b629bf..9701de9a20f 100644 --- a/component/config.go +++ b/component/config.go @@ -110,6 +110,9 @@ func callValidateIfPossible(v reflect.Value) error { return nil } +// Type represents the names of receivers (otlp, filelog, etc), +// processors (batch, memory_limit, etc), or exporters (debug, rabbitmq) +// It also includes the DataType - things like "metrics", "traces", "logs" type Type interface { fmt.Stringer encoding.TextMarshaler diff --git a/component/identifiable.go b/component/identifiable.go index 842aeda23f7..0c3f859f4af 100644 --- a/component/identifiable.go +++ b/component/identifiable.go @@ -13,9 +13,10 @@ import ( const typeAndNameSeparator = "/" // ID represents the identity for a component. It combines two values: -// * type - the Type of the component. -// * name - the name of that component. -// The component ID (combination type + name) is unique for a given component.Kind. +// * typeVal - the Type of the component. +// * nameVal - the name of that component. This is optional and can be an empty string. +// The Component ID (combination type + name) is unique for a given component.Kind. +// Component IDs are defined in configuration by type[/name] - for examples [traces/1] or [oltp/blah] type ID struct { typeVal Type `mapstructure:"-"` nameVal string `mapstructure:"-"` From 708a18d4ba25536f2dfffb0847129820d53169db Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 2 May 2024 22:12:14 -0400 Subject: [PATCH 11/13] changelog --- .chloggen/datatype-type-interface.yaml | 28 ++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .chloggen/datatype-type-interface.yaml diff --git a/.chloggen/datatype-type-interface.yaml b/.chloggen/datatype-type-interface.yaml new file mode 100644 index 00000000000..6d3435db982 --- /dev/null +++ b/.chloggen/datatype-type-interface.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Change Type to an interface, the new types ComponentType and DataType now implement it." + +# One or more tracking issues or pull requests related to the change +issues: [9429] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Creates ComponentType and DataType - ComponentType represents the names of components and DataType is an enum for Traces,Logs, and Metrics. + + NewType will now automatically create a DataType instead of ComponentType for the strings "traces", "logs", and "metrics". In addition, ID's zero value now contains a null value for Type - since it is now an interface." + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ['api'] From 52e2091e151b0c819469bd8fe1c44febf112b124 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Fri, 3 May 2024 16:17:11 -0400 Subject: [PATCH 12/13] testfix --- service/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/config_test.go b/service/config_test.go index 6ac9d6e625a..e632c5739f0 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -59,7 +59,7 @@ func TestConfigValidate(t *testing.T) { } return cfg }, - expected: fmt.Errorf(`service::pipelines config validation failed: %w`, errors.New(`pipeline "wrongtype": unknown datatype "wrongtype"`)), + expected: fmt.Errorf(`service::pipelines config validation failed: %w`, errors.New(`pipeline "wrongtype": is not a DataType "wrongtype"`)), }, { name: "invalid-telemetry-metric-config", From 38a2e96e564978325582ff19a90a21551256ac35 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Mon, 6 May 2024 13:55:42 -0400 Subject: [PATCH 13/13] trigger build --- .chloggen/datatype-type-interface.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.chloggen/datatype-type-interface.yaml b/.chloggen/datatype-type-interface.yaml index 6d3435db982..1ca0d6fe8c8 100644 --- a/.chloggen/datatype-type-interface.yaml +++ b/.chloggen/datatype-type-interface.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: component # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: "Change Type to an interface, the new types ComponentType and DataType now implement it." +note: "Change Type to an interface, introduce the new types ComponentType and DataType now implement it." # One or more tracking issues or pull requests related to the change issues: [9429] @@ -16,7 +16,7 @@ issues: [9429] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: | - Creates ComponentType and DataType - ComponentType represents the names of components and DataType is an enum for Traces,Logs, and Metrics. + Creates ComponentType and DataType - ComponentType represents the names of components and DataType is an enum for Traces,Logs, and Metrics (and future signal types!). NewType will now automatically create a DataType instead of ComponentType for the strings "traces", "logs", and "metrics". In addition, ID's zero value now contains a null value for Type - since it is now an interface."