From 5cc0707e8a29061c18f8fd17b4168ae8999dd348 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 12 Feb 2021 15:13:01 -0800 Subject: [PATCH] Move CombineErrors to consumererror package (#2442) Signed-off-by: Bogdan Drutu --- component/componenterror/errors.go | 27 +---------- config/configcheck/configcheck.go | 6 +-- consumer/consumererror/combineerrors.go | 47 +++++++++++++++++++ .../consumererror/combineerrors_test.go | 11 ++--- exporter/kafkaexporter/jaeger_marshaller.go | 4 +- .../prometheusremotewriteexporter/exporter.go | 3 +- processor/cloningfanoutconnector.go | 8 ++-- processor/fanoutconnector.go | 8 ++-- receiver/jaegerreceiver/trace_receiver.go | 3 +- receiver/scraperhelper/errors.go | 3 +- receiver/scraperhelper/scrapercontroller.go | 4 +- .../scraperhelper/scrapercontroller_test.go | 3 +- service/builder/exporters_builder.go | 8 ++-- service/builder/extensions_builder.go | 6 +-- service/builder/pipelines_builder.go | 4 +- service/builder/receivers_builder.go | 4 +- service/defaultcomponents/defaults.go | 4 +- service/service.go | 6 +-- 18 files changed, 89 insertions(+), 70 deletions(-) create mode 100644 consumer/consumererror/combineerrors.go rename component/componenterror/errors_test.go => consumer/consumererror/combineerrors_test.go (83%) diff --git a/component/componenterror/errors.go b/component/componenterror/errors.go index 0a89c29b3f2..ecc8c6a1f1f 100644 --- a/component/componenterror/errors.go +++ b/component/componenterror/errors.go @@ -18,8 +18,6 @@ package componenterror import ( "errors" - "fmt" - "strings" "go.opentelemetry.io/collector/consumer/consumererror" ) @@ -36,28 +34,7 @@ var ( ) // CombineErrors converts a list of errors into one error. +// Deprecated: use consumererror.CombineErrors instead. func CombineErrors(errs []error) error { - numErrors := len(errs) - if numErrors == 0 { - // No errors - return nil - } - - if numErrors == 1 { - return errs[0] - } - - errMsgs := make([]string, 0, numErrors) - permanent := false - for _, err := range errs { - if !permanent && consumererror.IsPermanent(err) { - permanent = true - } - errMsgs = append(errMsgs, err.Error()) - } - err := fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) - if permanent { - err = consumererror.Permanent(err) - } - return err + return consumererror.CombineErrors(errs) } diff --git a/config/configcheck/configcheck.go b/config/configcheck/configcheck.go index 5d344f28671..0bcb272f975 100644 --- a/config/configcheck/configcheck.go +++ b/config/configcheck/configcheck.go @@ -25,7 +25,7 @@ import ( "strings" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" ) // The regular expression for valid config field tag. @@ -56,7 +56,7 @@ func ValidateConfigFromFactories(factories component.Factories) error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // ValidateConfig enforces that given configuration object is following the patterns @@ -109,7 +109,7 @@ func validateConfigDataType(t reflect.Type) error { // reflect.UnsafePointer. } - if err := componenterror.CombineErrors(errs); err != nil { + if err := consumererror.CombineErrors(errs); err != nil { return fmt.Errorf( "type %q from package %q has invalid config settings: %v", t.Name(), diff --git a/consumer/consumererror/combineerrors.go b/consumer/consumererror/combineerrors.go new file mode 100644 index 00000000000..1010c07d60c --- /dev/null +++ b/consumer/consumererror/combineerrors.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumererror + +import ( + "fmt" + "strings" +) + +// CombineErrors converts a list of errors into one error. +func CombineErrors(errs []error) error { + numErrors := len(errs) + if numErrors == 0 { + // No errors + return nil + } + + if numErrors == 1 { + return errs[0] + } + + errMsgs := make([]string, 0, numErrors) + permanent := false + for _, err := range errs { + if !permanent && IsPermanent(err) { + permanent = true + } + errMsgs = append(errMsgs, err.Error()) + } + err := fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + if permanent { + err = Permanent(err) + } + return err +} diff --git a/component/componenterror/errors_test.go b/consumer/consumererror/combineerrors_test.go similarity index 83% rename from component/componenterror/errors_test.go rename to consumer/consumererror/combineerrors_test.go index c0c7b8e7bd8..404b853f342 100644 --- a/component/componenterror/errors_test.go +++ b/consumer/consumererror/combineerrors_test.go @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package componenterror_test +package consumererror import ( "fmt" "testing" - - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/consumer/consumererror" ) func TestCombineErrors(t *testing.T) { @@ -50,20 +47,20 @@ func TestCombineErrors(t *testing.T) { errors: []error{ fmt.Errorf("foo"), fmt.Errorf("bar"), - consumererror.Permanent(fmt.Errorf("permanent"))}, + Permanent(fmt.Errorf("permanent"))}, expected: "Permanent error: [foo; bar; Permanent error: permanent]", }, } for _, tc := range testCases { - got := componenterror.CombineErrors(tc.errors) + got := CombineErrors(tc.errors) if (got == nil) != tc.expectNil { t.Errorf("CombineErrors(%v) == nil? Got: %t. Want: %t", tc.errors, got == nil, tc.expectNil) } if got != nil && tc.expected != got.Error() { t.Errorf("CombineErrors(%v) = %q. Want: %q", tc.errors, got, tc.expected) } - if tc.expectedPermanent && !consumererror.IsPermanent(got) { + if tc.expectedPermanent && !IsPermanent(got) { t.Errorf("CombineErrors(%v) = %q. Want: consumererror.permanent", tc.errors, got) } } diff --git a/exporter/kafkaexporter/jaeger_marshaller.go b/exporter/kafkaexporter/jaeger_marshaller.go index 10d2fc1dad3..473bafc91a5 100644 --- a/exporter/kafkaexporter/jaeger_marshaller.go +++ b/exporter/kafkaexporter/jaeger_marshaller.go @@ -20,7 +20,7 @@ import ( "github.com/gogo/protobuf/jsonpb" jaegerproto "github.com/jaegertracing/jaeger/model" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" ) @@ -50,7 +50,7 @@ func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) { messages = append(messages, Message{Value: bts}) } } - return messages, componenterror.CombineErrors(errs) + return messages, consumererror.CombineErrors(errs) } func (j jaegerMarshaller) Encoding() string { diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index d269dc25c87..0f4d993d790 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -31,7 +31,6 @@ import ( "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" otlp "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1" @@ -155,7 +154,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int } if dropped != 0 { - return dropped, componenterror.CombineErrors(errs) + return dropped, consumererror.CombineErrors(errs) } return 0, nil diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index a4715de7633..f418631449b 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -17,8 +17,8 @@ package processor import ( "context" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" ) @@ -61,7 +61,7 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetrics(ctx context.Context, md } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // NewTracesCloningFanOutConnector wraps multiple traces consumers in a single one and clones the data @@ -98,7 +98,7 @@ func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pd } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // NewLogsCloningFanOutConnector wraps multiple trace consumers in a single one. @@ -134,5 +134,5 @@ func (lfc logsCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata. } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index f590c4280b0..8a7b7ea3ae1 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -17,8 +17,8 @@ package processor import ( "context" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" ) @@ -46,7 +46,7 @@ func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.M errs = append(errs, err) } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // NewTracesFanOutConnector wraps multiple trace consumers in a single one. @@ -70,7 +70,7 @@ func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Trac errs = append(errs, err) } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // NewLogsFanOutConnector wraps multiple log consumers in a single one. @@ -94,5 +94,5 @@ func (fc logsFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) er errs = append(errs, err) } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index c88307b29ca..d1a06900bed 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -49,6 +49,7 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/obsreport" jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" ) @@ -233,7 +234,7 @@ func (jr *jReceiver) Shutdown(context.Context) error { jr.grpc.Stop() jr.grpc = nil } - err = componenterror.CombineErrors(errs) + err = consumererror.CombineErrors(errs) }) return err diff --git a/receiver/scraperhelper/errors.go b/receiver/scraperhelper/errors.go index 6134aebd17f..ba11e127fee 100644 --- a/receiver/scraperhelper/errors.go +++ b/receiver/scraperhelper/errors.go @@ -19,7 +19,6 @@ import ( "fmt" "strings" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer/consumererror" ) @@ -35,7 +34,7 @@ func CombineScrapeErrors(errs []error) error { } if !partialScrapeErr { - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } errMsgs := make([]string, 0, len(errs)) diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/scrapercontroller.go index e829f6ca204..866c27368fe 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/scrapercontroller.go @@ -165,7 +165,7 @@ func (sc *controller) Shutdown(ctx context.Context) error { errs = append(errs, err) } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // startScraping initiates a ticker that calls Scrape based on the configured @@ -249,7 +249,7 @@ func (mms *multiMetricScraper) Shutdown(ctx context.Context) error { errs = append(errs, err) } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func (mms *multiMetricScraper) Scrape(ctx context.Context, receiverName string) (pdata.ResourceMetricsSlice, error) { diff --git a/receiver/scraperhelper/scrapercontroller_test.go b/receiver/scraperhelper/scrapercontroller_test.go index 6218e1f6c4c..d8ebc78325e 100644 --- a/receiver/scraperhelper/scrapercontroller_test.go +++ b/receiver/scraperhelper/scrapercontroller_test.go @@ -27,7 +27,6 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -327,7 +326,7 @@ func getExpectedShutdownErr(test metricsTestCase) error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func assertChannelsCalled(t *testing.T, chs []chan bool, message string) { diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 2fcd9174b65..bf90c5d2106 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -21,9 +21,9 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" ) // builtExporter is an exporter that is built based on a config. It can have @@ -43,7 +43,7 @@ func (bexp *builtExporter) Start(ctx context.Context, host component.Host) error } } - return componenterror.CombineErrors(errors) + return consumererror.CombineErrors(errors) } // Shutdown the trace component and the metrics component of an exporter. @@ -56,7 +56,7 @@ func (bexp *builtExporter) Shutdown(ctx context.Context) error { } } - return componenterror.CombineErrors(errors) + return consumererror.CombineErrors(errors) } func (bexp *builtExporter) getTraceExporter() component.TracesExporter { @@ -109,7 +109,7 @@ func (exps Exporters) ShutdownAll(ctx context.Context) error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func (exps Exporters) ToMapByDataType() map[configmodels.DataType]map[configmodels.Exporter]component.Exporter { diff --git a/service/builder/extensions_builder.go b/service/builder/extensions_builder.go index 8c10d9fb633..5927b1431cf 100644 --- a/service/builder/extensions_builder.go +++ b/service/builder/extensions_builder.go @@ -21,8 +21,8 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" ) // builtExporter is an exporter that is built based on a config. It can have @@ -71,7 +71,7 @@ func (exts Extensions) ShutdownAll(ctx context.Context) error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func (exts Extensions) NotifyPipelineReady() error { @@ -99,7 +99,7 @@ func (exts Extensions) NotifyPipelineNotReady() error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func (exts Extensions) ToMap() map[configmodels.Extension]component.ServiceExtension { diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index bf13b97563c..45ed409df3b 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -21,9 +21,9 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/processor" ) @@ -75,7 +75,7 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error { bp.logger.Info("Pipeline is shutdown.") } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // PipelinesBuilder builds pipelines from config. diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index 48724359f07..3989b0b815e 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -22,10 +22,10 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/processor" ) @@ -61,7 +61,7 @@ func (rcvs Receivers) ShutdownAll(ctx context.Context) error { } } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // StartAll starts all receivers. diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 157b9a7ce78..5e1b883f2cb 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -17,7 +17,7 @@ package defaultcomponents import ( "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter/fileexporter" "go.opentelemetry.io/collector/exporter/jaegerexporter" "go.opentelemetry.io/collector/exporter/kafkaexporter" @@ -117,5 +117,5 @@ func Components() ( Exporters: exporters, } - return factories, componenterror.CombineErrors(errs) + return factories, consumererror.CombineErrors(errs) } diff --git a/service/service.go b/service/service.go index 6eff0286683..7f2899b4c4c 100644 --- a/service/service.go +++ b/service/service.go @@ -34,11 +34,11 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/collector/telemetry" "go.opentelemetry.io/collector/internal/version" "go.opentelemetry.io/collector/service/builder" @@ -395,7 +395,7 @@ func (app *Application) shutdownPipelines(ctx context.Context) error { errs = append(errs, fmt.Errorf("failed to shutdown exporters: %w", err)) } - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } func (app *Application) shutdownExtensions(ctx context.Context) error { @@ -470,7 +470,7 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro app.stateChannel <- Closed close(app.stateChannel) - return componenterror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } // Run starts the collector according to the command and configuration