Skip to content

Commit

Permalink
exporter/loki: handle multi-tenant use-cases (#12415)
Browse files Browse the repository at this point in the history
* `exporter/loki`: handle multi-tenant use-cases

Fixes #3121

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Add changelog entry

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* linter suggestions

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Fail when both TenantID and Tenant are specified

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Jul 18, 2022
1 parent b380b99 commit 16ac192
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 12 deletions.
25 changes: 23 additions & 2 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ The following settings are required:

The following settings can be optionally configured:

- `tenant_id` (no default): The tenant ID used to identify the tenant the logs are associated to. This will set the
"X-Scope-OrgID" header used by Loki. If left unset, this header will not be added.
- `tenant`: composed of the properties `tenant.source` and `tenant.value`.
- `tenant.source`: one of "static", "context", or "attribute".
- `tenant.value`: the semantics depend on the tenant source. See the "Tenant information" section.

- `tls`:
- `insecure` (default = false): When set to true disables verifying the server's certificate chain and host name. The
Expand Down Expand Up @@ -82,6 +83,26 @@ loki:
The full list of settings exposed for this exporter are documented [here](./config.go) with detailed sample
configurations [here](./testdata/config.yaml).
## Tenant information
This processor is able to acquire the tenant ID based on different sources. At this moment, there are three possible sources:
- static
- context
- attribute
Each one has a strategy for obtaining the tenant ID, as follows:
- when "static" is set, the tenant is the literal value from the "tenant.value" property.
- when "context" is set, the tenant is looked up from the request metadata, such as HTTP headers, using the "value" as the
key (likely the header name).
- when "attribute" is set, the tenant is looked up from the resource attributes in the batch: the first value found among
the resource attributes is used. If you intend to have multiple tenants per HTTP request, make sure to use a processor
that groups tenants in batches, such as the `groupbyattrs` processor.

The value that is determined to be the tenant is then sent as the value for the HTTP header `X-Scope-OrgID`. When a tenant
is not provided, or a tenant cannot be determined, the logs are still sent to Loki but without the HTTP header.

## Advanced Configuration

Several helper files are leveraged to provide additional capabilities automatically:
Expand Down
26 changes: 26 additions & 0 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,45 @@ type Config struct {
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// TenantID defines the tenant ID to associate log streams with.
// Deprecated: use Tenant instead, with Source set to "static" and
// Value set to the value you'd set in this field.
TenantID string `mapstructure:"tenant_id"`

// Labels defines how labels should be applied to log streams sent to Loki.
Labels LabelsConfig `mapstructure:"labels"`

// Allows you to choose the entry format in the exporter
Format string `mapstructure:"format"`

// Tenant defines how to obtain the tenant ID
Tenant *Tenant `mapstructure:"tenant"`
}

type Tenant struct {
// Source defines where to obtain the tenant ID. Possible values: static, context, attribute.
Source string `mapstruct:"source"`

// Value will be used by the tenant source provider to lookup the value. For instance,
// when the source=static, the value is a static value. When the source=context, value
// should be the context key that holds the tenant information.
Value string `mapstruct:"value"`
}

func (c *Config) validate() error {
if _, err := url.Parse(c.Endpoint); c.Endpoint == "" || err != nil {
return fmt.Errorf("\"endpoint\" must be a valid URL")
}

if c.Tenant != nil {
if c.Tenant.Source != "attributes" && c.Tenant.Source != "context" && c.Tenant.Source != "static" {
return fmt.Errorf("invalid tenant source, must be one of 'attributes', 'context', 'static', but is %s", c.Tenant.Source)
}

if len(c.TenantID) > 0 {
return fmt.Errorf("both tenant_id and tenant were specified, use only 'tenant' instead")
}
}

return c.Labels.validate()
}

Expand Down
55 changes: 55 additions & 0 deletions exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func TestConfig_validate(t *testing.T) {
CredentialFile string
Audience string
Labels LabelsConfig
TenantID string
Tenant *Tenant
}
tests := []struct {
name string
Expand Down Expand Up @@ -251,6 +253,51 @@ func TestConfig_validate(t *testing.T) {
},
shouldError: true,
},
{
name: "with missing `tenant.source`",
fields: fields{
Endpoint: validEndpoint,
Labels: validAttribLabelsConfig,
Tenant: &Tenant{},
},
shouldError: true,
},
{
name: "with invalid `tenant.source`",
fields: fields{
Endpoint: validEndpoint,
Labels: validAttribLabelsConfig,
Tenant: &Tenant{
Source: "invalid",
},
},
shouldError: true,
},
{
name: "with valid `tenant.source`",
fields: fields{
Endpoint: validEndpoint,
Labels: validAttribLabelsConfig,
Tenant: &Tenant{
Source: "static",
Value: "acme",
},
},
shouldError: false,
},
{
name: "with both tenantID and tenant",
fields: fields{
Endpoint: validEndpoint,
Labels: validAttribLabelsConfig,
Tenant: &Tenant{
Source: "static",
Value: "acme",
},
TenantID: "globex",
},
shouldError: true,
},
}

for _, tt := range tests {
Expand All @@ -261,6 +308,14 @@ func TestConfig_validate(t *testing.T) {
cfg.Endpoint = tt.fields.Endpoint
cfg.Labels = tt.fields.Labels

if len(tt.fields.TenantID) > 0 {
cfg.TenantID = tt.fields.TenantID
}

if tt.fields.Tenant != nil {
cfg.Tenant = tt.fields.Tenant
}

err := cfg.validate()
if (err != nil) != tt.shouldError {
t.Errorf("validate() error = %v, shouldError %v", err, tt.shouldError)
Expand Down
50 changes: 40 additions & 10 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/tenant"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto"
)

Expand All @@ -45,30 +46,54 @@ const (
)

type lokiExporter struct {
config *Config
settings component.TelemetrySettings
client *http.Client
wg sync.WaitGroup
convert func(plog.LogRecord, pcommon.Resource) (*logproto.Entry, error)
config *Config
settings component.TelemetrySettings
client *http.Client
wg sync.WaitGroup
convert func(plog.LogRecord, pcommon.Resource) (*logproto.Entry, error)
tenantSource tenant.Source
}

func newExporter(config *Config, settings component.TelemetrySettings) *lokiExporter {
lokiexporter := &lokiExporter{
config: config,
settings: settings,
}

if config.Format == "json" {
lokiexporter.convert = lokiexporter.convertLogToJSONEntry
} else {
lokiexporter.convert = lokiexporter.convertLogBodyToEntry
}

if config.Tenant == nil {
config.Tenant = &Tenant{
Source: "static",

// this might be empty, which is fine, we handle empty later
Value: config.TenantID,
}
}

switch config.Tenant.Source {
case "static":
lokiexporter.tenantSource = &tenant.StaticTenantSource{
Value: config.Tenant.Value,
}
case "context":
lokiexporter.tenantSource = &tenant.ContextTenantSource{
Key: config.Tenant.Value,
}
case "attributes":
lokiexporter.tenantSource = &tenant.AttributeTenantSource{
Value: config.Tenant.Value,
}
}

return lokiexporter
}

func (l *lokiExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
l.wg.Add(1)
defer l.wg.Done()

pushReq, _ := l.logDataToLoki(ld)
if len(pushReq.Streams) == 0 {
return consumererror.NewPermanent(fmt.Errorf("failed to transform logs into Loki log streams"))
Expand All @@ -89,8 +114,13 @@ func (l *lokiExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
}
req.Header.Set("Content-Type", "application/x-protobuf")

if len(l.config.TenantID) > 0 {
req.Header.Set("X-Scope-OrgID", l.config.TenantID)
tenant, err := l.tenantSource.GetTenant(ctx, ld)
if err != nil {
return consumererror.NewPermanent(fmt.Errorf("failed to determine the tenant: %w", err))
}

if len(tenant) > 0 {
req.Header.Set("X-Scope-OrgID", tenant)
}

resp, err := l.client.Do(req)
Expand Down
74 changes: 74 additions & 0 deletions exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/tenant"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto"
)

Expand Down Expand Up @@ -256,6 +258,78 @@ func TestExporter_pushLogData(t *testing.T) {
}
}

func TestTenantSource(t *testing.T) {
testCases := []struct {
desc string
tenant *Tenant
srcType tenant.Source
}{
{
desc: "tenant source attributes",
tenant: &Tenant{
Source: "attributes",
Value: "tenant.name",
},
srcType: &tenant.AttributeTenantSource{},
},
{
desc: "tenant source context",
tenant: &Tenant{
Source: "context",
Value: "tenant.name",
},
srcType: &tenant.ContextTenantSource{},
},
{
desc: "tenant source static",
tenant: &Tenant{
Source: "static",
Value: "acme",
},
srcType: &tenant.StaticTenantSource{},
},
{
desc: "tenant source is non-existing",
tenant: nil,
srcType: &tenant.StaticTenantSource{},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cfg := &Config{
Tenant: tC.tenant,
Labels: LabelsConfig{
Attributes: map[string]string{
"severity": "severity",
},
},
}
exp := newExporter(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, exp)

assert.IsType(t, tC.srcType, exp.tenantSource)

cl := client.FromContext(context.Background())
cl.Metadata = client.NewMetadata(map[string][]string{"tenant.name": {"acme"}})

ctx := client.NewContext(context.Background(), cl)

ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty()
ld.ResourceLogs().At(0).Resource().Attributes().InsertString("tenant.name", "acme")

tenant, err := exp.tenantSource.GetTenant(ctx, ld)
assert.NoError(t, err)

if tC.tenant != nil {
assert.Equal(t, "acme", tenant)
} else {
assert.Empty(t, tenant)
}
})
}
}

func TestExporter_logDataToLoki(t *testing.T) {
config := &Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Expand Down
37 changes: 37 additions & 0 deletions exporter/lokiexporter/internal/tenant/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tenant // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/tenant"

import (
"context"

"go.opentelemetry.io/collector/pdata/plog"
)

var _ Source = (*AttributeTenantSource)(nil)

type AttributeTenantSource struct {
Value string
}

func (ts *AttributeTenantSource) GetTenant(_ context.Context, logs plog.Logs) (string, error) {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
if v, found := rl.Resource().Attributes().Get(ts.Value); found {
return v.StringVal(), nil
}
}
return "", nil
}
Loading

0 comments on commit 16ac192

Please sign in to comment.