Skip to content

Commit

Permalink
Build receivers based on new configuration (#25)
Browse files Browse the repository at this point in the history
- Build receivers and plug them into pipelines.

- Added tests to verify that single pipeline and multiple pipeline (fan out)
  per receiver work correctly.

- Disable -v flag in go test to reduce unnecessary noise in output, we have
  too many tests to use -v flag. Errors are still properly printed. This makes
  finding the cause of test failures easier.

- Add logging to various application startup steps.

- Changed logging from %q to %s in multiple places to make it more human readable
  when output in json format.

Testing done:

1. make && all unit tests pass.

2. make otelsvc and produce otelsvc_linux executable.

3. Manually verify that otelsvc_linux runs and correctly forwards trace data
   received via jaeger receiver and exported via opencensus exporter.

   This is the first successful run of unified OpenTelemetry Service.
  • Loading branch information
tigrannajaryan authored Jun 20, 2019
1 parent 2a40f54 commit 3aac7c1
Show file tree
Hide file tree
Showing 16 changed files with 765 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ALL_SRC := $(shell find . -name '*.go' \
# ALL_PKGS is used with 'go cover'
ALL_PKGS := $(shell go list $(sort $(dir $(ALL_SRC))))

GOTEST_OPT?=-v -race -timeout 30s
GOTEST_OPT?= -race -timeout 30s
GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -coverprofile=coverage.txt -covermode=atomic
GOTEST=go test
GOFMT=gofmt
Expand Down
14 changes: 7 additions & 7 deletions cmd/occollector/app/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (eb *ExportersBuilder) buildExporter(
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType)
}
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err)
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}

exporter.tc = tc
Expand All @@ -189,13 +189,15 @@ func (eb *ExportersBuilder) buildExporter(
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType)
}
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err)
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}

exporter.mc = mc
exporter.stop = combineStopFunc(exporter.stop, stopFunc)
}

eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name()))

return exporter, nil
}

Expand All @@ -204,10 +206,8 @@ func typeMismatchErr(
requiredByPipeline *configmodels.Pipeline,
dataType configmodels.DataType,
) error {
return fmt.Errorf(
"pipeline %q produces %q to exporter %s which does not support %q "+
"telemetry data. exporter will be detached from pipeline",
requiredByPipeline.Name, dataType.GetDataTypeStr(),
config.Name(), dataType.GetDataTypeStr(),
return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s",
requiredByPipeline.Name, dataType.GetString(),
config.Name(), dataType.GetString(),
)
}
28 changes: 15 additions & 13 deletions cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func NewPipelinesBuilder(
}

// Build pipeline processors from config.
func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) {
func (pb *PipelinesBuilder) Build() (PipelineProcessors, error) {
pipelineProcessors := make(PipelineProcessors)

for _, pipeline := range eb.config.Pipelines {
firstProcessor, err := eb.buildPipeline(pipeline)
for _, pipeline := range pb.config.Pipelines {
firstProcessor, err := pb.buildPipeline(pipeline)
if err != nil {
return nil, err
}
Expand All @@ -71,7 +71,7 @@ func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) {
// Builds a pipeline of processors. Returns the first processor in the pipeline.
// The last processor in the pipeline will be plugged to fan out the data into exporters
// that are configured for this pipeline.
func (eb *PipelinesBuilder) buildPipeline(
func (pb *PipelinesBuilder) buildPipeline(
pipelineCfg *configmodels.Pipeline,
) (*builtProcessor, error) {

Expand All @@ -83,9 +83,9 @@ func (eb *PipelinesBuilder) buildPipeline(

switch pipelineCfg.InputType {
case configmodels.TracesDataType:
tc = eb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
case configmodels.MetricsDataType:
mc = eb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
}

// Now build the processors backwards, starting from the last one.
Expand All @@ -94,7 +94,7 @@ func (eb *PipelinesBuilder) buildPipeline(
// in the pipeline and so on.
for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- {
procName := pipelineCfg.Processors[i]
procCfg := eb.config.Processors[procName]
procCfg := pb.config.Processors[procName]

factory := factories.GetProcessorFactory(procCfg.Type())

Expand All @@ -115,22 +115,24 @@ func (eb *PipelinesBuilder) buildPipeline(
}
}

pb.logger.Info("Pipeline is enabled.", zap.String("pipelines", pipelineCfg.Name))

return &builtProcessor{tc, mc}, nil
}

// Converts the list of exporter names to a list of corresponding builtExporters.
func (eb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
var result []*builtExporter
for _, name := range exporterNames {
exporter := eb.exporters[eb.config.Exporters[name]]
exporter := pb.exporters[pb.config.Exporters[name]]
result = append(result, exporter)
}

return result
}

func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer {
builtExporters := eb.getBuiltExportersByNames(exporterNames)
func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
Expand All @@ -146,8 +148,8 @@ func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st
return multiconsumer.NewTraceProcessor(exporters)
}

func (eb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := eb.getBuiltExportersByNames(exporterNames)
func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
Expand Down
Loading

0 comments on commit 3aac7c1

Please sign in to comment.