Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Docker target and discovery in Promtail. #4911

Merged
merged 53 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
c74a42c
Provide Docker target and discovery in Promtail.
jeschkies Dec 9, 2021
0ca5f1c
Add Docker service discovery.
jeschkies Dec 9, 2021
003b95a
Add Docker service discovery.
jeschkies Dec 10, 2021
5d2c8c8
Use stderr writer.
jeschkies Dec 14, 2021
05a3581
Rename target manager name in error message.
jeschkies Dec 23, 2021
d5d7166
Persist position.
jeschkies Dec 23, 2021
ce8767e
Run discovery manager.
jeschkies Jan 4, 2022
e19393c
Add target.
jeschkies Jan 5, 2022
d6acba3
Split out target creation into syncer.
jeschkies Jan 5, 2022
2fbfd7d
Add targets.
jeschkies Jan 6, 2022
b90d0b3
Document Docker target and service discovery.
jeschkies Jan 7, 2022
e874238
Rename syncer to target_group and make it threadsafe.
jeschkies Jan 7, 2022
165d07f
Format code.
jeschkies Jan 7, 2022
7f149bb
Test target manager.
jeschkies Jan 7, 2022
15d8121
Test target manager.
jeschkies Jan 7, 2022
ef4b805
Make linter happy.
jeschkies Jan 7, 2022
564a62e
Add changelog entry.
jeschkies Jan 7, 2022
01eea28
Merge remote-tracking branch 'grafana/main' into karsten/docker-sd
jeschkies Jan 7, 2022
43aa65d
Correct spelling.
jeschkies Jan 10, 2022
d3970be
Format code.
jeschkies Jan 10, 2022
71e8fe0
Fix docker config.
jeschkies Jan 10, 2022
f45aea8
Reset vendor.
jeschkies Jan 10, 2022
f9ebe64
Merge remote-tracking branch 'grafana/main' into karsten/docker-sd
jeschkies Jan 10, 2022
5ddc140
Address Michel's comments.
jeschkies Jan 10, 2022
eb63e79
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 11, 2022
7997660
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 11, 2022
80c0ab4
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 11, 2022
b6dc0d7
Address typos.
jeschkies Jan 11, 2022
676f05a
Stop target when container is stopped.
jeschkies Jan 11, 2022
4a820c3
Merge remote-tracking branch 'grafana/main' into karsten/docker-sd
jeschkies Jan 11, 2022
5f11eed
Use io.Pipe instead of channel.
jeschkies Jan 12, 2022
975cd7c
Document log stream label.
jeschkies Jan 12, 2022
1a2413b
Remove unused port.
jeschkies Jan 12, 2022
f608abf
Update docs/sources/clients/docker-driver/_index.md
jeschkies Jan 12, 2022
f63b4e2
Clean up.
jeschkies Jan 13, 2022
3e37bb1
Merge remote-tracking branch 'refs/remotes/origin/karsten/docker-sd' …
jeschkies Jan 13, 2022
45f6f46
Merge remote-tracking branch 'grafana/main' into karsten/docker-sd
jeschkies Jan 13, 2022
9d4ca1c
Remoe configuration of specific containers.
jeschkies Jan 14, 2022
46a72e4
Close entry handler for target groups.
jeschkies Jan 14, 2022
06cdf84
Relabel discovered labels.
jeschkies Jan 14, 2022
5403bcc
Document relabelling.
jeschkies Jan 14, 2022
2403cde
Merge branch 'main' into karsten/docker-sd
jeschkies Jan 14, 2022
6101736
Fix test.
jeschkies Jan 14, 2022
bbab1c0
Merge branch 'main' into karsten/docker-sd
jeschkies Jan 18, 2022
fddab7d
Update docs/sources/clients/docker-driver/_index.md
jeschkies Jan 19, 2022
baff77d
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 19, 2022
783ee02
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 19, 2022
cd730ef
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 19, 2022
fa94653
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 19, 2022
4971fcd
Update docs/sources/clients/promtail/configuration.md
jeschkies Jan 19, 2022
acd6673
Merge branch 'main' into karsten/docker-sd
jeschkies Jan 19, 2022
1db6d4f
Clarify authentication.
jeschkies Jan 19, 2022
9efe689
Merge branch 'main' into karsten/docker-sd
jeschkies Jan 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
## Main

* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
* [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`.
* [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager.
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines.
* [5144](https://github.com/grafana/loki/pull/5144) **dannykopping** Ruler: fix remote write basic auth credentials.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
* [5091](https://github.com/grafana/loki/pull/5091) **owen-d**: Changes `ingester.concurrent-flushes` default to 32
* [4879](https://github.com/grafana/loki/pull/4879) **cyriltovena**: LogQL: add __line__ function to | line_format template.
* [5081](https://github.com/grafana/loki/pull/5081) **SasSwart**: Add the option to configure memory ballast for Loki
Expand Down
26 changes: 14 additions & 12 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ import (

// Config describes a job to scrape.
type Config struct {
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"`
CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"`
CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of Docker service discovery configurations.
DockerSDConfigs []*moby.DockerSDConfig `yaml:"docker_sd_configs,omitempty"`
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
}

type ServiceDiscoveryConfig struct {
Expand Down
38 changes: 38 additions & 0 deletions clients/pkg/promtail/targets/docker/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package docker

import "github.com/prometheus/client_golang/prometheus"

// Metrics holds a set of Docker target metrics.
type Metrics struct {
reg prometheus.Registerer

dockerEntries prometheus.Counter
dockerErrors prometheus.Counter
}

// NewMetrics creates a new set of Docker target metrics. If reg is non-nil, the
// metrics will be registered.
func NewMetrics(reg prometheus.Registerer) *Metrics {
var m Metrics
m.reg = reg

m.dockerEntries = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "docker_target_entries_total",
Help: "Total number of successful entries sent to the Docker target",
})
m.dockerErrors = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "docker_target_parsing_errors_total",
Help: "Total number of parsing errors while receiving Docker messages",
})

if reg != nil {
reg.MustRegister(
m.dockerEntries,
m.dockerErrors,
)
}

return &m
}
231 changes: 231 additions & 0 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package docker

import (
"bufio"
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

docker_types "github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"go.uber.org/atomic"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/positions"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/logproto"
)

type Target struct {
logger log.Logger
handler api.EntryHandler
since int64
positions positions.Positions
containerName string
labels model.LabelSet
relabelConfig []*relabel.Config
metrics *Metrics

cancel context.CancelFunc
client client.APIClient
wg sync.WaitGroup
running *atomic.Bool
err error
}

func NewTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
position positions.Positions,
containerName string,
labels model.LabelSet,
relabelConfig []*relabel.Config,
client client.APIClient,
) (*Target, error) {

pos, err := position.Get(positions.CursorKey(containerName))
if err != nil {
return nil, err
}
var since int64
if pos != 0 {
since = pos
}

ctx, cancel := context.WithCancel(context.Background())
t := &Target{
logger: logger,
handler: handler,
since: since,
positions: position,
containerName: containerName,
labels: labels,
relabelConfig: relabelConfig,
metrics: metrics,

cancel: cancel,
client: client,
running: atomic.NewBool(false),
}
go t.processLoop(ctx)
return t, nil
}

func (t *Target) processLoop(ctx context.Context) {
t.wg.Add(1)
defer t.wg.Done()
t.running.Store(true)

opts := docker_types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Timestamps: true,
Since: strconv.FormatInt(t.since, 10),
}

logs, err := t.client.ContainerLogs(ctx, t.containerName, opts)
if err != nil {
level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err)
t.err = err
return
}

// Start transferring
rstdout, wstdout := io.Pipe()
rstderr, wstderr := io.Pipe()
t.wg.Add(1)
go func() {
defer func() {
t.wg.Done()
wstdout.Close()
wstderr.Close()
t.Stop()
}()

written, err := stdcopy.StdCopy(wstdout, wstderr, logs)
if err != nil {
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)
} else {
level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName)
}
}()

// Start processing
t.wg.Add(2)
go t.process(rstdout, "stdout")
go t.process(rstderr, "stderr")

// Wait until done
<-ctx.Done()
t.running.Store(false)
logs.Close()
level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName)
}

// extractTs tries for read the timestamp from the beginning of the log line.
// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00.
func extractTs(line string) (time.Time, string, error) {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
pair := strings.SplitN(line, " ", 2)
if len(pair) != 2 {
return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line)
}
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])
if err != nil {
return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err)
}
return ts, pair[1], nil
}

func (t *Target) process(r io.Reader, logStream string) {
defer func() {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
t.wg.Done()
}()

scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
ts, line, err := extractTs(line)
if err != nil {
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)
t.metrics.dockerErrors.Inc()
continue
}

// Add all labels from the config, relabel and filter them.
lb := labels.NewBuilder(nil)
for k, v := range t.labels {
lb.Set(string(k), string(v))
}
lb.Set(dockerLabelLogStream, logStream)
processed := relabel.Process(lb.Labels(), t.relabelConfig...)

filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

t.handler.Chan() <- api.Entry{
Labels: filtered,
Entry: logproto.Entry{
Timestamp: ts,
Line: line,
},
}
t.metrics.dockerEntries.Inc()
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

err := scanner.Err()
if err != nil {
level.Warn(t.logger).Log("msg", "finished scanning logs lines with an error", "err", err)
}

}

func (t *Target) Stop() {
t.cancel()
t.wg.Wait()
level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName)
}

func (t *Target) Type() target.TargetType {
return target.DockerTargetType
}

func (t *Target) Ready() bool {
return t.running.Load()
}

func (t *Target) DiscoveredLabels() model.LabelSet {
return t.labels
}

func (t *Target) Labels() model.LabelSet {
return t.labels
}

// Details returns target-specific details.
func (t *Target) Details() interface{} {
return map[string]string{
"id": t.containerName,
"error": t.err.Error(),
"position": t.positions.GetString(positions.CursorKey(t.containerName)),
"running": strconv.FormatBool(t.running.Load()),
}
}
Loading