Skip to content

Commit

Permalink
Refactor stanzareceiver into a helper package (1/2) (#2306)
Browse files Browse the repository at this point in the history
**Link to tracking Issue:** 
This PR partially addresses the following issues:
- Resolves: #2265 
- Related:  #2268, #2282.

**Description:**

The main idea here is to convert `stanzareceiver` into a helper package for building various other stanza-based receivers. Each of these other receivers will only vary by input operator. Functionality pulled out of `stanzareceiver` was moved into a new `filelogreceiver`. `stanzareceiver` should most likely be renamed and/or moved, but is left in its previous package for this initial PR. 

`stanzareceiver` defines an interface called `LogReceiverType` which each stanza-based receiver must implement and pass to `stanzareceiver.NewFactory(LogReceiverType) component.ReceiverFactory`. 

With this interface, each stanza-based receiver should only need a small amount of work to have a fully functional receiver. Support for parsing operations, emission from stanza's internal pipeline, and conversion to pdata format are all handled in the helper package so that these will be standardized across all the full set of stanza-based receivers.

**Next Steps**
Input operators are _not yet_ isolated to the top level of the configuration. The end goal is: 
```
filelog:
 include: [ receiver/stanzareceiver/testdata/simple.log ]
 start_at: beginning
 operators:
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

but the current state is still:
```
filelog:
 operators:
   - type: file_input
      include: [ receiver/stanzareceiver/testdata/simple.log ]
      start_at: beginning
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

The primary requirement #2265 is to promote the input operator to the top level of the receiver config. This will be the focus of the next PR. This PR is mostly concerned with splitting up the package. The configuration changes might be a little messy so I wanted to address those separately.

On the subject of configuration - the interface defined by `stanzareceiver` has a method `Decode(configmodels.Receiver) (pipeline.Config, error)` which is in my opinion much too loosely defined. Too much responsibility is delegated to each stanza-based receiver. The main reason this is left this way for now is that `stanza` operators do not currently use `mapstructure` for config unmarshaling. There is currently a workaround in place, but once stanza operators are migrated to `mapstructure`, more responsibility for unmarshaling should be extracted back into the helper package, and this interface method should end up a lot cleaner. I'm planning to look into this in the next PR.

**Open questions** (which can be addressed in this PR or the next):
- Should the helper package be completely standalone, or does it belong in `receivercreator` or similar?
- If the helper package should be standalone, what should it be called? (probably not `stanzareceiver`)

**Temporarily removed functionality**
This functionality will be implemented in the near future. There is some design to do on how exactly this should work when used by multiple receivers:
- Offsets database (tracked by #2287)
- Plugins (tracked as item on #2264)

**Testing:** 
Unit tests are roughly the same as before. A few cases were dropped because they no longer applied. Certainly more tests will be added as this pattern is solidified. 

Testbed scenario is unchanged and still passing:
```
> make run-tests
./runtests.sh
=== RUN   TestLog10kDPS
=== RUN   TestLog10kDPS/OTLP
... (abbreviated)
=== RUN   TestLog10kDPS/Stanza
... (abbreviated)
--- PASS: TestLog10kDPS (30.73s)
    --- PASS: TestLog10kDPS/OTLP (15.32s)
    --- PASS: TestLog10kDPS/Stanza (15.41s)
PASS
ok      github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests_unstable_exe    31.406s
# Test PerformanceResults
Started: Mon, 08 Feb 2021 13:35:08 -0500

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Log10kDPS/OTLP                          |PASS  |     15s|    19.9|    20.6|         39|         47|    149900|        149900|
Log10kDPS/Stanza                        |PASS  |     15s|    28.4|    29.3|         40|         48|    150000|        150000|

Total duration: 31s
```
  • Loading branch information
djaglowski authored Feb 11, 2021
1 parent ffce884 commit 339dd56
Show file tree
Hide file tree
Showing 26 changed files with 1,763 additions and 301 deletions.
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/unstable_components_enabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package main
import (
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"
)

func extraReceivers() []component.ReceiverFactory {
return []component.ReceiverFactory{
stanzareceiver.NewFactory(),
filelogreceiver.NewFactory(),
}
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-log-collection v0.14.0
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.20.0
Expand Down Expand Up @@ -167,7 +169,9 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxre

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver => ./receiver/zookeeperreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver => ./receiver/stanzareceiver/
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver => ./receiver/stanzareceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver => ./receiver/filelogreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memcachedreceiver => ./receiver/memcachedreceiver

Expand Down
1 change: 1 addition & 0 deletions receiver/filelogreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# README Status
This readme is out of date and will be updated soon

# Stanza Receiver

Tails and parses logs from a wide variety of sources using the [opentelemetry-log-collection](https://github.com/open-telemetry/opentelemetry-log-collection) library.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

// Package stanzareceiver implements a receiver that can be used by the
// Opentelemetry collector to receive logs using the stanza log agent
package stanzareceiver
package filelogreceiver
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package stanzareceiver
package filelogreceiver

import (
"context"
Expand All @@ -32,6 +32,8 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap/zaptest"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver"
)

type testHost struct {
Expand All @@ -46,8 +48,8 @@ func (h *testHost) ReportFatalError(err error) {

var _ component.Host = (*testHost)(nil)

func unmarshalConfig(t *testing.T, pipelineYaml string) OperatorConfig {
var operatorCfg OperatorConfig
func unmarshalConfig(t *testing.T, pipelineYaml string) stanzareceiver.OperatorConfig {
var operatorCfg stanzareceiver.OperatorConfig
require.NoError(t, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfg))
return operatorCfg
}
Expand Down Expand Up @@ -79,13 +81,17 @@ func TestReadStaticFile(t *testing.T) {
e3.Set(entry.NewRecordField("msg"), "Some details...")
e3.AddLabel("file_name", "simple.log")

expectedLogs := []pdata.Logs{convert(e1), convert(e2), convert(e3)}
expectedLogs := []pdata.Logs{
stanzareceiver.Convert(e1),
stanzareceiver.Convert(e2),
stanzareceiver.Convert(e3),
}

f := NewFactory()
sink := new(consumertest.LogsSink)
params := component.ReceiverCreateParams{Logger: zaptest.NewLogger(t)}

cfg := f.CreateDefaultConfig().(*Config)
cfg := f.CreateDefaultConfig().(*FileLogConfig)
cfg.Operators = unmarshalConfig(t, `
- type: file_input
include: [testdata/simple.log]
Expand Down Expand Up @@ -168,10 +174,10 @@ func (rt *rotationTest) Run(t *testing.T) {
e := entry.New()
e.Timestamp = expectedTimestamp
e.Set(entry.NewRecordField("msg"), msg)
expectedLogs[i] = convert(e)
expectedLogs[i] = stanzareceiver.Convert(e)
}

cfg := f.CreateDefaultConfig().(*Config)
cfg := f.CreateDefaultConfig().(*FileLogConfig)
cfg.Operators = unmarshalConfig(t, fmt.Sprintf(`
- type: file_input
include: [%s/*]
Expand Down
64 changes: 64 additions & 0 deletions receiver/filelogreceiver/filelog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filelogreceiver

import (

// Register input operator for filelog
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/input/file"
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver"
)

const typeStr = "filelog"

// NewFactory creates a factory for filelog receiver
func NewFactory() component.ReceiverFactory {
return stanzareceiver.NewFactory(ReceiverType{})
}

// ReceiverType implements stanzareceiver.LogReceiverType
// to create a file tailing receiver
type ReceiverType struct{}

// Type is the receiver type
func (f ReceiverType) Type() configmodels.Type {
return configmodels.Type(typeStr)
}

// CreateDefaultConfig creates a config with type and version
func (f ReceiverType) CreateDefaultConfig() configmodels.Receiver {
return &FileLogConfig{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
}
}

// Decode unmarshals configuration into a log parsing pipeline
func (f ReceiverType) Decode(cfg configmodels.Receiver) (pipeline.Config, error) {
logConfig := cfg.(*FileLogConfig)
return stanzareceiver.DecodeOperators(logConfig.Operators)
}

// FileLogConfig defines configuration for the filelog receiver
type FileLogConfig struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
Operators stanzareceiver.OperatorConfig `mapstructure:"operators"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package stanzareceiver
package filelogreceiver

import (
"path"
Expand All @@ -21,10 +21,18 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
require.NotNil(t, cfg, "failed to create default config")
require.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)
Expand All @@ -40,22 +48,26 @@ func TestLoadConfig(t *testing.T) {

assert.Equal(t, len(cfg.Receivers), 1)

assert.Equal(t, cfg.Receivers["stanza"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "stanza",
},
Operators: unmarshalConfig(t, `
operatorCfg := unmarshalConfig(t, `
- type: file_input
include: [ receiver/stanzareceiver/testdata/simple.log ]
include: [ receiver/filelogreceiver/testdata/simple.log ]
start_at: beginning
- type: regex_parser
regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
timestamp:
parse_from: time
layout: '%Y-%m-%d'
severity:
parse_from: sev`),
})
parse_from: sev`)

assert.Equal(t,
&FileLogConfig{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "filelog",
},
Operators: operatorCfg,
},
cfg.Receivers["filelog"],
)
}
15 changes: 15 additions & 0 deletions receiver/filelogreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver

go 1.14

require (
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-log-collection v0.14.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.20.0
go.uber.org/zap v1.16.0
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver => ../stanzareceiver
Loading

0 comments on commit 339dd56

Please sign in to comment.