diff --git a/.circleci/config.yml b/.circleci/config.yml index 2b4f673e7acc9..b9b77a97003b5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -45,7 +45,7 @@ workflows: # https://circleci.com/blog/circleci-hacks-reuse-yaml-in-your-circleci-config-with-yaml/ defaults: &defaults docker: - - image: grafana/loki-build-image:0.2.1 + - image: grafana/loki-build-image:0.3.0 working_directory: /go/src/github.com/grafana/loki jobs: @@ -156,7 +156,7 @@ jobs: key: v1-loki-{{ .Branch }}-{{ .Revision }} - restore_cache: key: v1-loki-plugin-{{ .Branch }}-{{ .Revision }} - + - run: name: Load Images command: | @@ -168,7 +168,7 @@ jobs: command: | docker login -u "$DOCKER_USER" -p "$DOCKER_PASS" && make push-latest - + - run: name: Push Docker Plugin command: | diff --git a/Gopkg.lock b/Gopkg.lock index 35077a3a2d6bb..b28ae4da1741b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -213,13 +213,24 @@ revision = "a9fb20d87448d386e6d50b1f2e1fa70dcf0de43c" [[projects]] - digest = "1:5155f7153c694dc8e2efd74d799a27fd54e65778fa3f0c3e17626df724857db9" + digest = "1:bc38b83376aa09bdc1e889c00ce73cb748b2140d535bb5c76cb9823da6c7a98a" name = "github.com/coreos/go-systemd" - packages = ["activation"] + packages = [ + "activation", + "sdjournal", + ] pruneopts = "UT" revision = "95778dfbb74eb7e4dbaf43bf7d71809650ef8076" version = "v19" +[[projects]] + digest = "1:6e2ff82d2fe11ee35ec8dceb4346b8144a761f1c8655592c4ebe99a92fcec327" + name = "github.com/coreos/pkg" + packages = ["dlopen"] + pruneopts = "UT" + revision = "97fdf19511ea361ae1c100dd393cc47f8dcfa1e1" + version = "v4" + [[projects]] branch = "master" digest = "1:5a07b5363e4c2aa127a3afd1e8e323d3a288ba1d90d37793d2e14843f5b5b82e" @@ -1561,6 +1572,7 @@ analyzer-version = 1 input-imports = [ "github.com/bmatcuk/doublestar", + "github.com/coreos/go-systemd/sdjournal", "github.com/cortexproject/cortex/pkg/chunk", "github.com/cortexproject/cortex/pkg/chunk/encoding", "github.com/cortexproject/cortex/pkg/chunk/storage", diff --git a/Gopkg.toml b/Gopkg.toml index 6edb1d04ec5bd..ca825bc32b6cc 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -74,4 +74,4 @@ [[override]] name = "k8s.io/client-go" - revision = "1a26190bd76a9017e289958b9fba936430aa3704" + revision = "1a26190bd76a9017e289958b9fba936430aa3704" \ No newline at end of file diff --git a/Makefile b/Makefile index 9fa45ebb43ab5..54ff6a7f297a0 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,7 @@ loki-build-image/$(UPTODATE): loki-build-image/* # All the boiler plate for building golang follows: SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E") BUILD_IN_CONTAINER := true +CGO_ENABLED := 0 # RM is parameterized to allow CircleCI to run builds, as it # currently disallows `docker run --rm`. This value is overridden # in circle.yml @@ -149,13 +150,13 @@ $(EXES) $(DEBUG_EXES) $(PROTO_GOS) $(YACC_GOS) lint test shell check-generated-f else $(DEBUG_EXES): loki-build-image/$(UPTODATE) - CGO_ENABLED=0 go build $(DEBUG_GO_FLAGS) -o $@ ./$(@D) + CGO_ENABLED=$(CGO_ENABLED) go build $(DEBUG_GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) # Copy the delve binary to make it easily available to put in the binary's container. [ -f "/go/bin/dlv" ] && mv "/go/bin/dlv" $(@D)/dlv $(EXES): loki-build-image/$(UPTODATE) - CGO_ENABLED=0 go build $(GO_FLAGS) -o $@ ./$(@D) + CGO_ENABLED=$(CGO_ENABLED) go build $(GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) %.pb.go: loki-build-image/$(UPTODATE) diff --git a/build/Dockerfile b/build/Dockerfile index 029a640a26731..7d6e92749d6c6 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -12,12 +12,12 @@ ARG GOARCH="amd64" COPY . /go/src/github.com/grafana/loki WORKDIR /go/src/github.com/grafana/loki RUN touch loki-build-image/.uptodate &&\ - mkdir /build + mkdir /build # production image FROM golang as builder-production ARG APP -RUN make BUILD_IN_CONTAINER=false cmd/${APP}/${APP} &&\ +RUN make CGO_ENABLED=1 BUILD_IN_CONTAINER=false cmd/${APP}/${APP} &&\ mv cmd/${APP}/${APP} /build/${APP} FROM scratch as production @@ -28,7 +28,7 @@ COPY --from=builder-production /build/${APP} /usr/bin/${APP} FROM golang as builder-debug ARG APP RUN go get github.com/go-delve/delve/cmd/dlv &&\ - make BUILD_IN_CONTAINER=false cmd/promtail/promtail-debug &&\ + make CGO_ENBALED=1 BUILD_IN_CONTAINER=false cmd/promtail/promtail-debug &&\ mv cmd/${APP}/${APP}-debug /build/app-debug &&\ mv cmd/${APP}/dlv /build/dlv diff --git a/docs/promtail-examples.md b/docs/promtail-examples.md index b56cb5fa2584a..4548bba899396 100644 --- a/docs/promtail-examples.md +++ b/docs/promtail-examples.md @@ -10,7 +10,7 @@ This example of config promtail based on original docker [config](https://github and show how work with 2 and more sources: Filename for example: my-docker-config.yaml -``` +```yaml server: http_listen_port: 9080 grpc_listen_port: 0 @@ -45,6 +45,7 @@ scrape_configs: __path__: /srv/log/someone_service/*.log ``` + #### Description Scrape_config section of config.yaml contents contains various jobs for parsing your logs @@ -54,15 +55,79 @@ Scrape_config section of config.yaml contents contains various jobs for parsing `__path__` it is path to directory where stored your logs. If you run promtail and this config.yaml in Docker container, don't forget use docker volumes for mapping real directories -with log to those folders in the container. +with log to those folders in the container. #### Example Use 1) Create folder, for example `promtail`, then new sub directory `build/conf` and place there `my-docker-config.yaml`. 2) Create new Dockerfile in root folder `promtail`, with contents -``` +```dockerfile FROM grafana/promtail:latest COPY build/conf /etc/promtail ``` 3) Create your Docker image based on original Promtail image and tag it, for example `mypromtail-image` 3) After that you can run Docker container by this command: `docker run -d --name promtail --network loki_network -p 9080:9080 -v /var/log:/var/log -v /srv/log/someone_service:/srv/log/someone_service mypromtail-image -config.file=/etc/promtail/my-docker-config.yaml` + +## Simple Systemd Journal Config + +This example demonstrates how to configure promtail to listen to systemd journal +entries and write them to Loki: + +Filename for example: my-systemd-journal-config.yaml + +```yaml +server: + http_listen_port: 9080 + grpc_listen_port: 0 + +positions: + filename: /tmp/positions.yaml + +clients: + - url: http://ip_or_hostname_where_loki_runns:3100/api/prom/push + +scrape_configs: + - job_name: journal + journal: + path: /var/log/journal + labels: + job: systemd-journal + relabel_configs: + - source_labels: ['__journal__systemd_unit'] + target_label: 'unit' +``` + +### Description + +Just like the Docker example, the `scrape_configs` sections holds various +jobs for parsing logs. A job with a `journal` key configures it for systemd +journal reading. + +`path` is an optional string specifying the path to read journal entries +from. If unspecified, defaults to the system default (`/var/log/journal`). + +`labels`: is a map of string values specifying labels that should always +be associated with each log entry being read from the systemd journal. +In our example, each log will have a label of `job=systemd-journal`. + +Every field written to the systemd journal is available for processing +in the `relabel_configs` section. Label names are converted to lowercase +and prefixed with `__journal_`. After `relabel_configs` processes all +labels for a job entry, any label starting with `__` is deleted. + +Our example renames the `_SYSTEMD_UNIT` label (available as +`__journal__systemd_unit` in promtail) to `unit` so it will be available +in Loki. All other labels from the journal entry are dropped. + +### Example Use + +`promtail` must have access to the journal path (`/var/log/journal`) +where journal entries are stored for journal support to work correctly. + +If running with Docker, that means to bind that path: + +```bash +docker run -d --name promtail --network loki_network -p 9080:9080 \ + -v /var/log/journal:/var/log/journal \ + mypromtail-image -config.file=/etc/promtail/my-systemd-journal-config.yaml +``` \ No newline at end of file diff --git a/loki-build-image/Dockerfile b/loki-build-image/Dockerfile index cff510148c33d..ea6ecc8051daa 100644 --- a/loki-build-image/Dockerfile +++ b/loki-build-image/Dockerfile @@ -1,23 +1,23 @@ FROM golang:1.11.4-stretch -RUN apt-get update && apt-get install -y file jq unzip protobuf-compiler libprotobuf-dev && \ - rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* +RUN apt-get update && apt-get install -y file jq unzip protobuf-compiler libprotobuf-dev libsystemd-dev && \ + rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* ENV DOCKER_VER="17.03.0-ce" RUN curl -L -o /tmp/docker-$DOCKER_VER.tgz https://download.docker.com/linux/static/stable/x86_64/docker-$DOCKER_VER.tgz && \ - tar -xz -C /tmp -f /tmp/docker-$DOCKER_VER.tgz && \ - mv /tmp/docker/* /usr/bin && \ - rm /tmp/docker-$DOCKER_VER.tgz + tar -xz -C /tmp -f /tmp/docker-$DOCKER_VER.tgz && \ + mv /tmp/docker/* /usr/bin && \ + rm /tmp/docker-$DOCKER_VER.tgz ENV HELM_VER="v2.13.1" RUN curl -L -o /tmp/helm-$HELM_VER.tgz http://storage.googleapis.com/kubernetes-helm/helm-${HELM_VER}-linux-amd64.tar.gz && \ - tar -xz -C /tmp -f /tmp/helm-$HELM_VER.tgz && \ - mv /tmp/linux-amd64/helm /usr/bin/helm && \ - rm -rf /tmp/linux-amd64 /tmp/helm-$HELM_VER.tgz + tar -xz -C /tmp -f /tmp/helm-$HELM_VER.tgz && \ + mv /tmp/linux-amd64/helm /usr/bin/helm && \ + rm -rf /tmp/linux-amd64 /tmp/helm-$HELM_VER.tgz RUN go get \ - github.com/golang/protobuf/protoc-gen-go \ - github.com/gogo/protobuf/protoc-gen-gogoslick \ - github.com/gogo/protobuf/gogoproto \ - github.com/go-delve/delve/cmd/dlv \ - golang.org/x/tools/cmd/goyacc && \ - rm -rf /go/pkg /go/src + github.com/golang/protobuf/protoc-gen-go \ + github.com/gogo/protobuf/protoc-gen-gogoslick \ + github.com/gogo/protobuf/gogoproto \ + github.com/go-delve/delve/cmd/dlv \ + golang.org/x/tools/cmd/goyacc && \ + rm -rf /go/pkg /go/src ENV GOLANGCI_LINT_COMMIT="692dacb773b703162c091c2d8c59f9cd2d6801db" RUN mkdir -p $(go env GOPATH)/src/github.com/golangci/ && git clone https://github.com/golangci/golangci-lint.git $(go env GOPATH)/src/github.com/golangci/golangci-lint && \ cd $(go env GOPATH)/src/github.com/golangci/golangci-lint && git checkout ${GOLANGCI_LINT_COMMIT} && cd cmd/golangci-lint/ &&\ diff --git a/pkg/promtail/positions/positions.go b/pkg/promtail/positions/positions.go index c1bbee74acd58..074cd83146fa8 100644 --- a/pkg/promtail/positions/positions.go +++ b/pkg/promtail/positions/positions.go @@ -5,6 +5,8 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" + "strings" "sync" "time" @@ -32,14 +34,14 @@ type Positions struct { logger log.Logger cfg Config mtx sync.Mutex - positions map[string]int64 + positions map[string]string quit chan struct{} done chan struct{} } // File format for the positions data. type File struct { - Positions map[string]int64 `yaml:"positions"` + Positions map[string]string `yaml:"positions"` } // New makes a new Positions. @@ -67,20 +69,42 @@ func (p *Positions) Stop() { <-p.done } -// Put records (asynchronously) how far we've read through a file. -func (p *Positions) Put(path string, pos int64) { +// PutString records (asynchronsouly) how far we've read through a file. +// Unlike Put, it records a string offset and is only useful for +// JournalTargets which doesn't have integer offsets. +func (p *Positions) PutString(path string, pos string) { p.mtx.Lock() defer p.mtx.Unlock() p.positions[path] = pos } -// Get returns how far we've read through a file. -func (p *Positions) Get(path string) int64 { +// Put records (asynchronously) how far we've read through a file. +func (p *Positions) Put(path string, pos int64) { + p.PutString(path, strconv.FormatInt(pos, 10)) +} + +// GetString returns how far we've through a file as a string. +// JournalTarget writes a journal cursor to the positions file, while +// FileTarget writes an integer offset. Use Get to read the integer +// offset. +func (p *Positions) GetString(path string) string { p.mtx.Lock() defer p.mtx.Unlock() return p.positions[path] } +// Get returns how far we've read through a file. Returns an error +// if the value stored for the file is not an integer. +func (p *Positions) Get(path string) (int64, error) { + p.mtx.Lock() + defer p.mtx.Unlock() + pos, ok := p.positions[path] + if !ok { + return 0, nil + } + return strconv.ParseInt(pos, 10, 64) +} + // Remove removes the position tracking for a filepath func (p *Positions) Remove(path string) { p.mtx.Lock() @@ -118,7 +142,7 @@ func (p *Positions) run() { func (p *Positions) save() { p.mtx.Lock() - positions := make(map[string]int64, len(p.positions)) + positions := make(map[string]string, len(p.positions)) for k, v := range p.positions { positions[k] = v } @@ -134,6 +158,12 @@ func (p *Positions) cleanup() { defer p.mtx.Unlock() toRemove := []string{} for k := range p.positions { + // If the position file is prefixed with journal, it's a + // JournalTarget cursor and not a file on disk. + if strings.HasPrefix(k, "journal-") { + continue + } + if _, err := os.Stat(k); err != nil { if os.IsNotExist(err) { // File no longer exists. @@ -150,11 +180,11 @@ func (p *Positions) cleanup() { } } -func readPositionsFile(filename string) (map[string]int64, error) { +func readPositionsFile(filename string) (map[string]string, error) { buf, err := ioutil.ReadFile(filepath.Clean(filename)) if err != nil { if os.IsNotExist(err) { - return map[string]int64{}, nil + return map[string]string{}, nil } return nil, err } @@ -167,7 +197,7 @@ func readPositionsFile(filename string) (map[string]int64, error) { return p.Positions, nil } -func writePositionFile(filename string, positions map[string]int64) error { +func writePositionFile(filename string, positions map[string]string) error { buf, err := yaml.Marshal(File{ Positions: positions, }) diff --git a/pkg/promtail/scrape/scrape.go b/pkg/promtail/scrape/scrape.go index 8a0f0c159297a..237e5bcf6665c 100644 --- a/pkg/promtail/scrape/scrape.go +++ b/pkg/promtail/scrape/scrape.go @@ -2,6 +2,9 @@ package scrape import ( "fmt" + "reflect" + + "github.com/prometheus/common/model" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/pkg/relabel" @@ -15,15 +18,33 @@ type Config struct { JobName string `yaml:"job_name,omitempty"` EntryParser api.EntryParser `yaml:"entry_parser"` PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"` + JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"` RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"` } +// JournalTargetConfig describes systemd journal records to scrape. +type JournalTargetConfig struct { + // Labels optionally holds labels to associate with each record coming out + // of the journal. + Labels model.LabelSet `yaml:"labels"` + + // Path to a directory to read journal entries from. Defaults to system path + // if empty. + Path string `yaml:"path"` +} + // DefaultScrapeConfig is the default Config. var DefaultScrapeConfig = Config{ EntryParser: api.Docker, } +// HasServiceDiscoveryConfig checks to see if the service discovery used for +// file targets is non-zero. +func (c *Config) HasServiceDiscoveryConfig() bool { + return !reflect.DeepEqual(c.ServiceDiscoveryConfig, sd_config.ServiceDiscoveryConfig{}) +} + // UnmarshalYAML implements the yaml.Unmarshaler interface. func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultScrapeConfig diff --git a/pkg/promtail/targets/filetarget.go b/pkg/promtail/targets/filetarget.go index d1fddbd790e1e..fa3cdf4680a57 100644 --- a/pkg/promtail/targets/filetarget.go +++ b/pkg/promtail/targets/filetarget.go @@ -130,21 +130,11 @@ func (t *FileTarget) Type() TargetType { return FileTargetType } -// DiscoveredLabels implements a Target -func (t *FileTarget) DiscoveredLabels() model.LabelSet { - return t.discoveredLabels -} - -// Labels implements a Target -func (t *FileTarget) Labels() model.LabelSet { - return t.labels -} - // Details implements a Target func (t *FileTarget) Details() interface{} { files := map[string]int64{} for fileName := range t.tails { - files[fileName] = t.positions.Get(fileName) + files[fileName], _ = t.positions.Get(fileName) } return files } diff --git a/pkg/promtail/targets/filetarget_test.go b/pkg/promtail/targets/filetarget_test.go index 649b71bf53c5d..a05c3a58aec0a 100644 --- a/pkg/promtail/targets/filetarget_test.go +++ b/pkg/promtail/targets/filetarget_test.go @@ -89,7 +89,7 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { // Assert the position value is in the correct spot. if val, ok := p.Positions[logFile]; ok { - if val != 50 { + if val != "50" { t.Error("Incorrect position found, expected 50, found", val) } } else { @@ -182,7 +182,7 @@ func TestWatchEntireDirectory(t *testing.T) { // Assert the position value is in the correct spot. if val, ok := p.Positions[logFileDir+"test.log"]; ok { - if val != 50 { + if val != "50" { t.Error("Incorrect position found, expected 50, found", val) } } else { @@ -488,14 +488,14 @@ func TestGlobWithMultipleFiles(t *testing.T) { // Assert the position value is in the correct spot. if val, ok := p.Positions[logFile1]; ok { - if val != 60 { + if val != "60" { t.Error("Incorrect position found for file 1, expected 60, found", val) } } else { t.Error("Positions file did not contain any data for our test log file") } if val, ok := p.Positions[logFile2]; ok { - if val != 60 { + if val != "60" { t.Error("Incorrect position found for file 2, expected 60, found", val) } } else { diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go index 471b38a51a962..a096c7192c948 100644 --- a/pkg/promtail/targets/filetargetmanager.go +++ b/pkg/promtail/targets/filetargetmanager.go @@ -74,8 +74,12 @@ func NewFileTargetManager( config := map[string]sd_config.ServiceDiscoveryConfig{} for _, cfg := range scrapeConfigs { + if !cfg.HasServiceDiscoveryConfig() { + continue + } + registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "file_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) if err != nil { return nil, err } diff --git a/pkg/promtail/targets/journaltarget.go b/pkg/promtail/targets/journaltarget.go new file mode 100644 index 0000000000000..2d399a4981a50 --- /dev/null +++ b/pkg/promtail/targets/journaltarget.go @@ -0,0 +1,206 @@ +// +build linux,cgo + +package targets + +import ( + "fmt" + "io" + "io/ioutil" + "strings" + "time" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + + "github.com/go-kit/kit/log/level" + + "github.com/grafana/loki/pkg/promtail/positions" + + "github.com/go-kit/kit/log" + "github.com/grafana/loki/pkg/promtail/scrape" + + "github.com/coreos/go-systemd/sdjournal" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +const ( + // journalEmptyStr is represented as a single-character space because + // returning an empty string from sdjournal.JournalReaderConfig's + // Formatter causes an immediate EOF and induces performance issues + // with how that is handled in sdjournal. + journalEmptyStr = " " +) + +type journalReader interface { + io.Closer + Follow(until <-chan time.Time, writer io.Writer) error +} + +type journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error) + +var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) { + return sdjournal.NewJournalReader(c) +} + +// JournalTarget tails systemd journal entries. +type JournalTarget struct { + logger log.Logger + handler api.EntryHandler + positions *positions.Positions + positionPath string + relabelConfig []*relabel.Config + config *scrape.JournalTargetConfig + labels model.LabelSet + + r journalReader + until chan time.Time +} + +// NewJournalTarget configures a new JournalTarget. +func NewJournalTarget( + logger log.Logger, + handler api.EntryHandler, + positions *positions.Positions, + jobName string, + relabelConfig []*relabel.Config, + targetConfig *scrape.JournalTargetConfig, +) (*JournalTarget, error) { + + return journalTargetWithReader( + logger, + handler, + positions, + jobName, + relabelConfig, + targetConfig, + defaultJournalReaderFunc, + ) +} + +func journalTargetWithReader( + logger log.Logger, + handler api.EntryHandler, + positions *positions.Positions, + jobName string, + relabelConfig []*relabel.Config, + targetConfig *scrape.JournalTargetConfig, + readerFunc journalReaderFunc, +) (*JournalTarget, error) { + + positionPath := fmt.Sprintf("journal-%s", jobName) + position := positions.GetString(positionPath) + + if readerFunc == nil { + readerFunc = defaultJournalReaderFunc + } + + until := make(chan time.Time) + t := &JournalTarget{ + logger: logger, + handler: handler, + positions: positions, + positionPath: positionPath, + relabelConfig: relabelConfig, + labels: targetConfig.Labels, + config: targetConfig, + + until: until, + } + + // Default to system path if not defined. Passing an empty string to + // sdjournal is valid but forces reads from the journal to be from + // the local machine id only, which contradicts the default behavior + // of when a path is specified. To standardize, we manually default the + // path here. + journalPath := targetConfig.Path + if journalPath == "" { + journalPath = "/var/log/journal" + } + + var err error + t.r, err = readerFunc(sdjournal.JournalReaderConfig{ + Path: journalPath, + Cursor: position, + Formatter: t.formatter, + }) + if err != nil { + return nil, errors.Wrap(err, "creating journal reader") + } + + go func() { + err := t.r.Follow(until, ioutil.Discard) + if err != nil && err != sdjournal.ErrExpired { + level.Error(t.logger).Log("msg", "received error during sdjournal follow", "err", err.Error()) + } + }() + + return t, nil +} + +func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) { + ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond)) + + msg, ok := entry.Fields["MESSAGE"] + if !ok { + level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field") + return journalEmptyStr, nil + } + entryLabels := makeJournalFields(entry.Fields) + + // Add constant labels + for k, v := range t.labels { + entryLabels[string(k)] = string(v) + } + + processedLabels := relabel.Process(labels.FromMap(entryLabels), t.relabelConfig...) + + processedLabelsMap := processedLabels.Map() + labels := make(model.LabelSet, len(processedLabelsMap)) + for k, v := range processedLabelsMap { + if k[0:2] == "__" { + continue + } + + labels[model.LabelName(k)] = model.LabelValue(v) + } + if len(labels) == 0 { + // No labels, drop journal entry + return journalEmptyStr, nil + } + + t.positions.PutString(t.positionPath, entry.Cursor) + err := t.handler.Handle(labels, ts, msg) + return journalEmptyStr, err +} + +// Type returns JournalTargetType. +func (t *JournalTarget) Type() TargetType { + return JournalTargetType +} + +// Ready indicates whether or not the journal is ready to be +// read from. +func (t *JournalTarget) Ready() bool { + return true +} + +// Details returns target-specific details (currently nil). +func (t *JournalTarget) Details() interface{} { + return nil +} + +// Stop shuts down the JournalTarget. +func (t *JournalTarget) Stop() error { + t.until <- time.Now() + return t.r.Close() +} + +func makeJournalFields(fields map[string]string) map[string]string { + result := make(map[string]string, len(fields)) + for k, v := range fields { + result[fmt.Sprintf("__journal_%s", strings.ToLower(k))] = v + } + return result +} diff --git a/pkg/promtail/targets/journaltarget_test.go b/pkg/promtail/targets/journaltarget_test.go new file mode 100644 index 0000000000000..2c40ab880a6f6 --- /dev/null +++ b/pkg/promtail/targets/journaltarget_test.go @@ -0,0 +1,112 @@ +// +build linux,cgo + +package targets + +import ( + "io" + "os" + "testing" + "time" + + "github.com/coreos/go-systemd/sdjournal" + + "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/pkg/relabel" + + "github.com/stretchr/testify/assert" + + "github.com/grafana/loki/pkg/promtail/scrape" + + "github.com/go-kit/kit/log" + "github.com/grafana/loki/pkg/promtail/positions" + "github.com/stretchr/testify/require" +) + +type mockJournalReader struct { + config sdjournal.JournalReaderConfig + t *testing.T +} + +func newMockJournalReader(c sdjournal.JournalReaderConfig) (journalReader, error) { + return &mockJournalReader{config: c}, nil +} + +func (r *mockJournalReader) Close() error { + return nil +} + +func (r *mockJournalReader) Follow(until <-chan time.Time, writer io.Writer) error { + <-until + return nil +} + +func (r *mockJournalReader) Write(msg string, fields map[string]string) { + allFields := make(map[string]string, len(fields)) + for k, v := range fields { + allFields[k] = v + } + allFields["MESSAGE"] = msg + + ts := uint64(time.Now().UnixNano()) + + _, err := r.config.Formatter(&sdjournal.JournalEntry{ + Fields: allFields, + MonotonicTimestamp: ts, + RealtimeTimestamp: ts, + }) + assert.NoError(r.t, err) +} + +func TestJournalTarget(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFileName := dirName + "/positions.yml" + + // Set the sync period to a really long value, to guarantee the sync timer + // never runs, this way we know everything saved was done through channel + // notifications when target.stop() was called. + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Fatal(err) + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + relabelCfg := ` +- source_labels: ['__journal_code_file'] + regex: 'journaltarget_test\.go' + action: 'keep' +- source_labels: ['__journal_code_file'] + target_label: 'code_file'` + + var relabels []*relabel.Config + err = yaml.Unmarshal([]byte(relabelCfg), &relabels) + require.NoError(t, err) + + jt, err := journalTargetWithReader(logger, client, ps, "test", relabels, + &scrape.JournalTargetConfig{}, newMockJournalReader) + require.NoError(t, err) + + r := jt.r.(*mockJournalReader) + r.t = t + + for i := 0; i < 10; i++ { + r.Write("ping", map[string]string{ + "CODE_FILE": "journaltarget_test.go", + }) + assert.NoError(t, err) + } + + assert.Len(t, client.messages, 10) + require.NoError(t, jt.Stop()) +} diff --git a/pkg/promtail/targets/journaltargetmanager.go b/pkg/promtail/targets/journaltargetmanager.go new file mode 100644 index 0000000000000..cb409eec34783 --- /dev/null +++ b/pkg/promtail/targets/journaltargetmanager.go @@ -0,0 +1,45 @@ +// +build !linux !cgo + +package targets + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/positions" + "github.com/grafana/loki/pkg/promtail/scrape" +) + +// JournalTargetManager manages a series of JournalTargets. +type JournalTargetManager struct{} + +// NewJournalTargetManager returns nil as JournalTargets are not supported +// on this platform. +func NewJournalTargetManager( + logger log.Logger, + positions *positions.Positions, + client api.EntryHandler, + scrapeConfigs []scrape.Config, +) (*JournalTargetManager, error) { + level.Warn(logger).Log("msg", "WARNING!!! Journal target manager initialized on platform without Journal support!") + return &JournalTargetManager{}, nil +} + +// Ready always returns false for JournalTargetManager on non-Linux +// platforms. +func (tm *JournalTargetManager) Ready() bool { + return false +} + +// Stop is a no-op on non-Linux platforms. +func (tm *JournalTargetManager) Stop() {} + +// ActiveTargets always returns nil on non-Linux platforms. +func (tm *JournalTargetManager) ActiveTargets() map[string][]Target { + return nil +} + +// AllTargets always returns nil on non-Linux platforms. +func (tm *JournalTargetManager) AllTargets() map[string][]Target { + return nil +} diff --git a/pkg/promtail/targets/journaltargetmanager_linux.go b/pkg/promtail/targets/journaltargetmanager_linux.go new file mode 100644 index 0000000000000..d538a3e7ac0ca --- /dev/null +++ b/pkg/promtail/targets/journaltargetmanager_linux.go @@ -0,0 +1,96 @@ +// +build cgo + +package targets + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/positions" + "github.com/grafana/loki/pkg/promtail/scrape" + "github.com/prometheus/client_golang/prometheus" +) + +// JournalTargetManager manages a series of JournalTargets. +type JournalTargetManager struct { + logger log.Logger + targets map[string]*JournalTarget +} + +// NewJournalTargetManager creates a new JournalTargetManager. +func NewJournalTargetManager( + logger log.Logger, + positions *positions.Positions, + client api.EntryHandler, + scrapeConfigs []scrape.Config, +) (*JournalTargetManager, error) { + tm := &JournalTargetManager{ + logger: logger, + targets: make(map[string]*JournalTarget), + } + + for _, cfg := range scrapeConfigs { + if cfg.JournalConfig == nil { + continue + } + + registerer := prometheus.DefaultRegisterer + pipeline, err := stages.NewPipeline(log.With(logger, "component", "journal_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) + if err != nil { + return nil, err + } + + t, err := NewJournalTarget( + logger, + pipeline.Wrap(client), + positions, + cfg.JobName, + cfg.RelabelConfigs, + cfg.JournalConfig, + ) + if err != nil { + return nil, err + } + + tm.targets[cfg.JobName] = t + } + + return tm, nil +} + +// Ready returns true if at least one JournalTarget is also ready. +func (tm *JournalTargetManager) Ready() bool { + for _, t := range tm.targets { + if t.Ready() { + return true + } + } + return false +} + +// Stop stops the JournalTargetManager and all of its JournalTargets. +func (tm *JournalTargetManager) Stop() { + for _, t := range tm.targets { + if err := t.Stop(); err != nil { + level.Error(t.logger).Log("msg", "error stopping JournalTarget", "err", err.Error()) + } + } +} + +// ActiveTargets returns the list of JournalTargets where journal data +// is being read. ActiveTargets is an alias to AllTargets as +// JournalTargets cannot be deactivated, only stopped. +func (tm *JournalTargetManager) ActiveTargets() map[string][]Target { + return tm.AllTargets() +} + +// AllTargets returns the list of all targets where journal data +// is currently being read. +func (tm *JournalTargetManager) AllTargets() map[string][]Target { + result := make(map[string][]Target, len(tm.targets)) + for k, v := range tm.targets { + result[k] = []Target{v} + } + return result +} diff --git a/pkg/promtail/targets/manager.go b/pkg/promtail/targets/manager.go index 003c204ba1e1f..a375c71563f20 100644 --- a/pkg/promtail/targets/manager.go +++ b/pkg/promtail/targets/manager.go @@ -31,20 +31,44 @@ func NewTargetManagers( ) (*TargetManagers, error) { var targetManagers []targetManager var fileScrapeConfigs []scrape.Config + var journalScrapeConfigs []scrape.Config - // for now every scrape config is a file target - fileScrapeConfigs = append(fileScrapeConfigs, scrapeConfigs...) - fileTargetManager, err := NewFileTargetManager( - logger, - positions, - client, - fileScrapeConfigs, - targetConfig, - ) - if err != nil { - return nil, errors.Wrap(err, "failed to make file target manager") + for _, cfg := range scrapeConfigs { + if cfg.HasServiceDiscoveryConfig() { + fileScrapeConfigs = append(fileScrapeConfigs, cfg) + } + } + if len(fileScrapeConfigs) > 0 { + fileTargetManager, err := NewFileTargetManager( + logger, + positions, + client, + fileScrapeConfigs, + targetConfig, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to make file target manager") + } + targetManagers = append(targetManagers, fileTargetManager) + } + + for _, cfg := range scrapeConfigs { + if cfg.JournalConfig != nil { + journalScrapeConfigs = append(journalScrapeConfigs, cfg) + } + } + if len(journalScrapeConfigs) > 0 { + journalTargetManager, err := NewJournalTargetManager( + logger, + positions, + client, + journalScrapeConfigs, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to make journal target manager") + } + targetManagers = append(targetManagers, journalTargetManager) } - targetManagers = append(targetManagers, fileTargetManager) return &TargetManagers{targetManagers: targetManagers}, nil @@ -72,7 +96,7 @@ func (tm *TargetManagers) AllTargets() map[string][]Target { return result } -// Ready if there's at least one ready FileTargetManager +// Ready if there's at least one ready target manager. func (tm *TargetManagers) Ready() bool { for _, t := range tm.targetManagers { if t.Ready() { diff --git a/pkg/promtail/targets/tailer.go b/pkg/promtail/targets/tailer.go index 88294bd6f0d70..fce4d318b4c0a 100644 --- a/pkg/promtail/targets/tailer.go +++ b/pkg/promtail/targets/tailer.go @@ -32,7 +32,12 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions *positions if err != nil { return nil, err } - if fi.Size() < positions.Get(path) { + pos, err := positions.Get(path) + if err != nil { + return nil, err + } + + if fi.Size() < pos { positions.Remove(path) } @@ -41,7 +46,7 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions *positions Poll: true, ReOpen: true, Location: &tail.SeekInfo{ - Offset: positions.Get(path), + Offset: pos, Whence: 0, }, }) diff --git a/pkg/promtail/targets/target.go b/pkg/promtail/targets/target.go index 21deb1141f3d3..15841cf9bea8a 100644 --- a/pkg/promtail/targets/target.go +++ b/pkg/promtail/targets/target.go @@ -8,8 +8,14 @@ import ( type TargetType string const ( - // FileTargetType a file target + // FileTargetType is a file target FileTargetType = TargetType("File") + + // JournalTargetType is a journalctl target + JournalTargetType = TargetType("Journal") + + // DroppedTargetType is a target that's been dropped. + DroppedTargetType = TargetType("dropped") ) // Target is a promtail scrape target @@ -18,17 +24,13 @@ type Target interface { Type() TargetType // Ready tells if the targets is ready Ready() bool - // Labels before any processing. - DiscoveredLabels() model.LabelSet - // Any labels that are added to this target and its stream - Labels() model.LabelSet // Details is additional information about this target specific to its type Details() interface{} } // IsDropped tells if a target has been dropped func IsDropped(t Target) bool { - return len(t.Labels()) == 0 + return t.Type() == DroppedTargetType } // droppedTarget is a target that has been dropped @@ -46,7 +48,7 @@ func newDroppedTarget(reason string, discoveredLabels model.LabelSet) Target { // Type implements Target func (d *droppedTarget) Type() TargetType { - return "none" + return DroppedTargetType } // Ready implements Target @@ -54,16 +56,6 @@ func (d *droppedTarget) Ready() bool { return false } -// DiscoveredLabels implements Target -func (d *droppedTarget) DiscoveredLabels() model.LabelSet { - return d.discoveredLabels -} - -// Labels implements Target -func (d *droppedTarget) Labels() model.LabelSet { - return nil -} - // Details implements Target it contains a message explaining the reason for dropping it func (d *droppedTarget) Details() interface{} { return d.reason diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/functions.go b/vendor/github.com/coreos/go-systemd/sdjournal/functions.go new file mode 100644 index 0000000000000..e132369c12741 --- /dev/null +++ b/vendor/github.com/coreos/go-systemd/sdjournal/functions.go @@ -0,0 +1,66 @@ +// Copyright 2015 RedHat, Inc. +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdjournal + +import ( + "github.com/coreos/pkg/dlopen" + "sync" + "unsafe" +) + +var ( + // lazy initialized + libsystemdHandle *dlopen.LibHandle + + libsystemdMutex = &sync.Mutex{} + libsystemdFunctions = map[string]unsafe.Pointer{} + libsystemdNames = []string{ + // systemd < 209 + "libsystemd-journal.so.0", + "libsystemd-journal.so", + + // systemd >= 209 merged libsystemd-journal into libsystemd proper + "libsystemd.so.0", + "libsystemd.so", + } +) + +func getFunction(name string) (unsafe.Pointer, error) { + libsystemdMutex.Lock() + defer libsystemdMutex.Unlock() + + if libsystemdHandle == nil { + h, err := dlopen.GetHandle(libsystemdNames) + if err != nil { + return nil, err + } + + libsystemdHandle = h + } + + f, ok := libsystemdFunctions[name] + if !ok { + var err error + f, err = libsystemdHandle.GetSymbolPointer(name) + if err != nil { + return nil, err + } + + libsystemdFunctions[name] = f + } + + return f, nil +} diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/journal.go b/vendor/github.com/coreos/go-systemd/sdjournal/journal.go new file mode 100644 index 0000000000000..9f3d9234239b8 --- /dev/null +++ b/vendor/github.com/coreos/go-systemd/sdjournal/journal.go @@ -0,0 +1,1120 @@ +// Copyright 2015 RedHat, Inc. +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sdjournal provides a low-level Go interface to the +// systemd journal wrapped around the sd-journal C API. +// +// All public read methods map closely to the sd-journal API functions. See the +// sd-journal.h documentation[1] for information about each function. +// +// To write to the journal, see the pure-Go "journal" package +// +// [1] http://www.freedesktop.org/software/systemd/man/sd-journal.html +package sdjournal + +// #include +// #include +// #include +// #include +// +// int +// my_sd_journal_open(void *f, sd_journal **ret, int flags) +// { +// int (*sd_journal_open)(sd_journal **, int); +// +// sd_journal_open = f; +// return sd_journal_open(ret, flags); +// } +// +// int +// my_sd_journal_open_directory(void *f, sd_journal **ret, const char *path, int flags) +// { +// int (*sd_journal_open_directory)(sd_journal **, const char *, int); +// +// sd_journal_open_directory = f; +// return sd_journal_open_directory(ret, path, flags); +// } +// +// int +// my_sd_journal_open_files(void *f, sd_journal **ret, const char **paths, int flags) +// { +// int (*sd_journal_open_files)(sd_journal **, const char **, int); +// +// sd_journal_open_files = f; +// return sd_journal_open_files(ret, paths, flags); +// } +// +// void +// my_sd_journal_close(void *f, sd_journal *j) +// { +// int (*sd_journal_close)(sd_journal *); +// +// sd_journal_close = f; +// sd_journal_close(j); +// } +// +// int +// my_sd_journal_get_usage(void *f, sd_journal *j, uint64_t *bytes) +// { +// int (*sd_journal_get_usage)(sd_journal *, uint64_t *); +// +// sd_journal_get_usage = f; +// return sd_journal_get_usage(j, bytes); +// } +// +// int +// my_sd_journal_add_match(void *f, sd_journal *j, const void *data, size_t size) +// { +// int (*sd_journal_add_match)(sd_journal *, const void *, size_t); +// +// sd_journal_add_match = f; +// return sd_journal_add_match(j, data, size); +// } +// +// int +// my_sd_journal_add_disjunction(void *f, sd_journal *j) +// { +// int (*sd_journal_add_disjunction)(sd_journal *); +// +// sd_journal_add_disjunction = f; +// return sd_journal_add_disjunction(j); +// } +// +// int +// my_sd_journal_add_conjunction(void *f, sd_journal *j) +// { +// int (*sd_journal_add_conjunction)(sd_journal *); +// +// sd_journal_add_conjunction = f; +// return sd_journal_add_conjunction(j); +// } +// +// void +// my_sd_journal_flush_matches(void *f, sd_journal *j) +// { +// int (*sd_journal_flush_matches)(sd_journal *); +// +// sd_journal_flush_matches = f; +// sd_journal_flush_matches(j); +// } +// +// int +// my_sd_journal_next(void *f, sd_journal *j) +// { +// int (*sd_journal_next)(sd_journal *); +// +// sd_journal_next = f; +// return sd_journal_next(j); +// } +// +// int +// my_sd_journal_next_skip(void *f, sd_journal *j, uint64_t skip) +// { +// int (*sd_journal_next_skip)(sd_journal *, uint64_t); +// +// sd_journal_next_skip = f; +// return sd_journal_next_skip(j, skip); +// } +// +// int +// my_sd_journal_previous(void *f, sd_journal *j) +// { +// int (*sd_journal_previous)(sd_journal *); +// +// sd_journal_previous = f; +// return sd_journal_previous(j); +// } +// +// int +// my_sd_journal_previous_skip(void *f, sd_journal *j, uint64_t skip) +// { +// int (*sd_journal_previous_skip)(sd_journal *, uint64_t); +// +// sd_journal_previous_skip = f; +// return sd_journal_previous_skip(j, skip); +// } +// +// int +// my_sd_journal_get_data(void *f, sd_journal *j, const char *field, const void **data, size_t *length) +// { +// int (*sd_journal_get_data)(sd_journal *, const char *, const void **, size_t *); +// +// sd_journal_get_data = f; +// return sd_journal_get_data(j, field, data, length); +// } +// +// int +// my_sd_journal_set_data_threshold(void *f, sd_journal *j, size_t sz) +// { +// int (*sd_journal_set_data_threshold)(sd_journal *, size_t); +// +// sd_journal_set_data_threshold = f; +// return sd_journal_set_data_threshold(j, sz); +// } +// +// int +// my_sd_journal_get_cursor(void *f, sd_journal *j, char **cursor) +// { +// int (*sd_journal_get_cursor)(sd_journal *, char **); +// +// sd_journal_get_cursor = f; +// return sd_journal_get_cursor(j, cursor); +// } +// +// int +// my_sd_journal_test_cursor(void *f, sd_journal *j, const char *cursor) +// { +// int (*sd_journal_test_cursor)(sd_journal *, const char *); +// +// sd_journal_test_cursor = f; +// return sd_journal_test_cursor(j, cursor); +// } +// +// int +// my_sd_journal_get_realtime_usec(void *f, sd_journal *j, uint64_t *usec) +// { +// int (*sd_journal_get_realtime_usec)(sd_journal *, uint64_t *); +// +// sd_journal_get_realtime_usec = f; +// return sd_journal_get_realtime_usec(j, usec); +// } +// +// int +// my_sd_journal_get_monotonic_usec(void *f, sd_journal *j, uint64_t *usec, sd_id128_t *boot_id) +// { +// int (*sd_journal_get_monotonic_usec)(sd_journal *, uint64_t *, sd_id128_t *); +// +// sd_journal_get_monotonic_usec = f; +// return sd_journal_get_monotonic_usec(j, usec, boot_id); +// } +// +// int +// my_sd_journal_seek_head(void *f, sd_journal *j) +// { +// int (*sd_journal_seek_head)(sd_journal *); +// +// sd_journal_seek_head = f; +// return sd_journal_seek_head(j); +// } +// +// int +// my_sd_journal_seek_tail(void *f, sd_journal *j) +// { +// int (*sd_journal_seek_tail)(sd_journal *); +// +// sd_journal_seek_tail = f; +// return sd_journal_seek_tail(j); +// } +// +// +// int +// my_sd_journal_seek_cursor(void *f, sd_journal *j, const char *cursor) +// { +// int (*sd_journal_seek_cursor)(sd_journal *, const char *); +// +// sd_journal_seek_cursor = f; +// return sd_journal_seek_cursor(j, cursor); +// } +// +// int +// my_sd_journal_seek_realtime_usec(void *f, sd_journal *j, uint64_t usec) +// { +// int (*sd_journal_seek_realtime_usec)(sd_journal *, uint64_t); +// +// sd_journal_seek_realtime_usec = f; +// return sd_journal_seek_realtime_usec(j, usec); +// } +// +// int +// my_sd_journal_wait(void *f, sd_journal *j, uint64_t timeout_usec) +// { +// int (*sd_journal_wait)(sd_journal *, uint64_t); +// +// sd_journal_wait = f; +// return sd_journal_wait(j, timeout_usec); +// } +// +// void +// my_sd_journal_restart_data(void *f, sd_journal *j) +// { +// void (*sd_journal_restart_data)(sd_journal *); +// +// sd_journal_restart_data = f; +// sd_journal_restart_data(j); +// } +// +// int +// my_sd_journal_enumerate_data(void *f, sd_journal *j, const void **data, size_t *length) +// { +// int (*sd_journal_enumerate_data)(sd_journal *, const void **, size_t *); +// +// sd_journal_enumerate_data = f; +// return sd_journal_enumerate_data(j, data, length); +// } +// +// int +// my_sd_journal_query_unique(void *f, sd_journal *j, const char *field) +// { +// int(*sd_journal_query_unique)(sd_journal *, const char *); +// +// sd_journal_query_unique = f; +// return sd_journal_query_unique(j, field); +// } +// +// int +// my_sd_journal_enumerate_unique(void *f, sd_journal *j, const void **data, size_t *length) +// { +// int(*sd_journal_enumerate_unique)(sd_journal *, const void **, size_t *); +// +// sd_journal_enumerate_unique = f; +// return sd_journal_enumerate_unique(j, data, length); +// } +// +// void +// my_sd_journal_restart_unique(void *f, sd_journal *j) +// { +// void(*sd_journal_restart_unique)(sd_journal *); +// +// sd_journal_restart_unique = f; +// sd_journal_restart_unique(j); +// } +// +// int +// my_sd_journal_get_catalog(void *f, sd_journal *j, char **ret) +// { +// int(*sd_journal_get_catalog)(sd_journal *, char **); +// +// sd_journal_get_catalog = f; +// return sd_journal_get_catalog(j, ret); +// } +// +import "C" +import ( + "bytes" + "errors" + "fmt" + "strings" + "sync" + "syscall" + "time" + "unsafe" +) + +// Journal entry field strings which correspond to: +// http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html +const ( + // User Journal Fields + SD_JOURNAL_FIELD_MESSAGE = "MESSAGE" + SD_JOURNAL_FIELD_MESSAGE_ID = "MESSAGE_ID" + SD_JOURNAL_FIELD_PRIORITY = "PRIORITY" + SD_JOURNAL_FIELD_CODE_FILE = "CODE_FILE" + SD_JOURNAL_FIELD_CODE_LINE = "CODE_LINE" + SD_JOURNAL_FIELD_CODE_FUNC = "CODE_FUNC" + SD_JOURNAL_FIELD_ERRNO = "ERRNO" + SD_JOURNAL_FIELD_SYSLOG_FACILITY = "SYSLOG_FACILITY" + SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER = "SYSLOG_IDENTIFIER" + SD_JOURNAL_FIELD_SYSLOG_PID = "SYSLOG_PID" + + // Trusted Journal Fields + SD_JOURNAL_FIELD_PID = "_PID" + SD_JOURNAL_FIELD_UID = "_UID" + SD_JOURNAL_FIELD_GID = "_GID" + SD_JOURNAL_FIELD_COMM = "_COMM" + SD_JOURNAL_FIELD_EXE = "_EXE" + SD_JOURNAL_FIELD_CMDLINE = "_CMDLINE" + SD_JOURNAL_FIELD_CAP_EFFECTIVE = "_CAP_EFFECTIVE" + SD_JOURNAL_FIELD_AUDIT_SESSION = "_AUDIT_SESSION" + SD_JOURNAL_FIELD_AUDIT_LOGINUID = "_AUDIT_LOGINUID" + SD_JOURNAL_FIELD_SYSTEMD_CGROUP = "_SYSTEMD_CGROUP" + SD_JOURNAL_FIELD_SYSTEMD_SESSION = "_SYSTEMD_SESSION" + SD_JOURNAL_FIELD_SYSTEMD_UNIT = "_SYSTEMD_UNIT" + SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT = "_SYSTEMD_USER_UNIT" + SD_JOURNAL_FIELD_SYSTEMD_OWNER_UID = "_SYSTEMD_OWNER_UID" + SD_JOURNAL_FIELD_SYSTEMD_SLICE = "_SYSTEMD_SLICE" + SD_JOURNAL_FIELD_SELINUX_CONTEXT = "_SELINUX_CONTEXT" + SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP = "_SOURCE_REALTIME_TIMESTAMP" + SD_JOURNAL_FIELD_BOOT_ID = "_BOOT_ID" + SD_JOURNAL_FIELD_MACHINE_ID = "_MACHINE_ID" + SD_JOURNAL_FIELD_HOSTNAME = "_HOSTNAME" + SD_JOURNAL_FIELD_TRANSPORT = "_TRANSPORT" + + // Address Fields + SD_JOURNAL_FIELD_CURSOR = "__CURSOR" + SD_JOURNAL_FIELD_REALTIME_TIMESTAMP = "__REALTIME_TIMESTAMP" + SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP = "__MONOTONIC_TIMESTAMP" +) + +// Journal event constants +const ( + SD_JOURNAL_NOP = int(C.SD_JOURNAL_NOP) + SD_JOURNAL_APPEND = int(C.SD_JOURNAL_APPEND) + SD_JOURNAL_INVALIDATE = int(C.SD_JOURNAL_INVALIDATE) +) + +const ( + // IndefiniteWait is a sentinel value that can be passed to + // sdjournal.Wait() to signal an indefinite wait for new journal + // events. It is implemented as the maximum value for a time.Duration: + // https://github.com/golang/go/blob/e4dcf5c8c22d98ac9eac7b9b226596229624cb1d/src/time/time.go#L434 + IndefiniteWait time.Duration = 1<<63 - 1 +) + +var ( + // ErrNoTestCursor gets returned when using TestCursor function and cursor + // parameter is not the same as the current cursor position. + ErrNoTestCursor = errors.New("Cursor parameter is not the same as current position") +) + +// Journal is a Go wrapper of an sd_journal structure. +type Journal struct { + cjournal *C.sd_journal + mu sync.Mutex +} + +// JournalEntry represents all fields of a journal entry plus address fields. +type JournalEntry struct { + Fields map[string]string + Cursor string + RealtimeTimestamp uint64 + MonotonicTimestamp uint64 +} + +// Match is a convenience wrapper to describe filters supplied to AddMatch. +type Match struct { + Field string + Value string +} + +// String returns a string representation of a Match suitable for use with AddMatch. +func (m *Match) String() string { + return m.Field + "=" + m.Value +} + +// NewJournal returns a new Journal instance pointing to the local journal +func NewJournal() (j *Journal, err error) { + j = &Journal{} + + sd_journal_open, err := getFunction("sd_journal_open") + if err != nil { + return nil, err + } + + r := C.my_sd_journal_open(sd_journal_open, &j.cjournal, C.SD_JOURNAL_LOCAL_ONLY) + + if r < 0 { + return nil, fmt.Errorf("failed to open journal: %d", syscall.Errno(-r)) + } + + return j, nil +} + +// NewJournalFromDir returns a new Journal instance pointing to a journal residing +// in a given directory. +func NewJournalFromDir(path string) (j *Journal, err error) { + j = &Journal{} + + sd_journal_open_directory, err := getFunction("sd_journal_open_directory") + if err != nil { + return nil, err + } + + p := C.CString(path) + defer C.free(unsafe.Pointer(p)) + + r := C.my_sd_journal_open_directory(sd_journal_open_directory, &j.cjournal, p, 0) + if r < 0 { + return nil, fmt.Errorf("failed to open journal in directory %q: %d", path, syscall.Errno(-r)) + } + + return j, nil +} + +// NewJournalFromFiles returns a new Journal instance pointing to a journals residing +// in a given files. +func NewJournalFromFiles(paths ...string) (j *Journal, err error) { + j = &Journal{} + + sd_journal_open_files, err := getFunction("sd_journal_open_files") + if err != nil { + return nil, err + } + + // by making the slice 1 elem too long, we guarantee it'll be null-terminated + cPaths := make([]*C.char, len(paths)+1) + for idx, path := range paths { + p := C.CString(path) + cPaths[idx] = p + defer C.free(unsafe.Pointer(p)) + } + + r := C.my_sd_journal_open_files(sd_journal_open_files, &j.cjournal, &cPaths[0], 0) + if r < 0 { + return nil, fmt.Errorf("failed to open journals in paths %q: %d", paths, syscall.Errno(-r)) + } + + return j, nil +} + +// Close closes a journal opened with NewJournal. +func (j *Journal) Close() error { + sd_journal_close, err := getFunction("sd_journal_close") + if err != nil { + return err + } + + j.mu.Lock() + C.my_sd_journal_close(sd_journal_close, j.cjournal) + j.mu.Unlock() + + return nil +} + +// AddMatch adds a match by which to filter the entries of the journal. +func (j *Journal) AddMatch(match string) error { + sd_journal_add_match, err := getFunction("sd_journal_add_match") + if err != nil { + return err + } + + m := C.CString(match) + defer C.free(unsafe.Pointer(m)) + + j.mu.Lock() + r := C.my_sd_journal_add_match(sd_journal_add_match, j.cjournal, unsafe.Pointer(m), C.size_t(len(match))) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to add match: %d", syscall.Errno(-r)) + } + + return nil +} + +// AddDisjunction inserts a logical OR in the match list. +func (j *Journal) AddDisjunction() error { + sd_journal_add_disjunction, err := getFunction("sd_journal_add_disjunction") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_add_disjunction(sd_journal_add_disjunction, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to add a disjunction in the match list: %d", syscall.Errno(-r)) + } + + return nil +} + +// AddConjunction inserts a logical AND in the match list. +func (j *Journal) AddConjunction() error { + sd_journal_add_conjunction, err := getFunction("sd_journal_add_conjunction") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_add_conjunction(sd_journal_add_conjunction, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to add a conjunction in the match list: %d", syscall.Errno(-r)) + } + + return nil +} + +// FlushMatches flushes all matches, disjunctions and conjunctions. +func (j *Journal) FlushMatches() { + sd_journal_flush_matches, err := getFunction("sd_journal_flush_matches") + if err != nil { + return + } + + j.mu.Lock() + C.my_sd_journal_flush_matches(sd_journal_flush_matches, j.cjournal) + j.mu.Unlock() +} + +// Next advances the read pointer into the journal by one entry. +func (j *Journal) Next() (uint64, error) { + sd_journal_next, err := getFunction("sd_journal_next") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_next(sd_journal_next, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to iterate journal: %d", syscall.Errno(-r)) + } + + return uint64(r), nil +} + +// NextSkip advances the read pointer by multiple entries at once, +// as specified by the skip parameter. +func (j *Journal) NextSkip(skip uint64) (uint64, error) { + sd_journal_next_skip, err := getFunction("sd_journal_next_skip") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_next_skip(sd_journal_next_skip, j.cjournal, C.uint64_t(skip)) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to iterate journal: %d", syscall.Errno(-r)) + } + + return uint64(r), nil +} + +// Previous sets the read pointer into the journal back by one entry. +func (j *Journal) Previous() (uint64, error) { + sd_journal_previous, err := getFunction("sd_journal_previous") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_previous(sd_journal_previous, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to iterate journal: %d", syscall.Errno(-r)) + } + + return uint64(r), nil +} + +// PreviousSkip sets back the read pointer by multiple entries at once, +// as specified by the skip parameter. +func (j *Journal) PreviousSkip(skip uint64) (uint64, error) { + sd_journal_previous_skip, err := getFunction("sd_journal_previous_skip") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_previous_skip(sd_journal_previous_skip, j.cjournal, C.uint64_t(skip)) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to iterate journal: %d", syscall.Errno(-r)) + } + + return uint64(r), nil +} + +func (j *Journal) getData(field string) (unsafe.Pointer, C.int, error) { + sd_journal_get_data, err := getFunction("sd_journal_get_data") + if err != nil { + return nil, 0, err + } + + f := C.CString(field) + defer C.free(unsafe.Pointer(f)) + + var d unsafe.Pointer + var l C.size_t + + j.mu.Lock() + r := C.my_sd_journal_get_data(sd_journal_get_data, j.cjournal, f, &d, &l) + j.mu.Unlock() + + if r < 0 { + return nil, 0, fmt.Errorf("failed to read message: %d", syscall.Errno(-r)) + } + + return d, C.int(l), nil +} + +// GetData gets the data object associated with a specific field from the +// the journal entry referenced by the last completed Next/Previous function +// call. To call GetData, you must have first called one of these functions. +func (j *Journal) GetData(field string) (string, error) { + d, l, err := j.getData(field) + if err != nil { + return "", err + } + + return C.GoStringN((*C.char)(d), l), nil +} + +// GetDataValue gets the data object associated with a specific field from the +// journal entry referenced by the last completed Next/Previous function call, +// returning only the value of the object. To call GetDataValue, you must first +// have called one of the Next/Previous functions. +func (j *Journal) GetDataValue(field string) (string, error) { + val, err := j.GetData(field) + if err != nil { + return "", err + } + + return strings.SplitN(val, "=", 2)[1], nil +} + +// GetDataBytes gets the data object associated with a specific field from the +// journal entry referenced by the last completed Next/Previous function call. +// To call GetDataBytes, you must first have called one of these functions. +func (j *Journal) GetDataBytes(field string) ([]byte, error) { + d, l, err := j.getData(field) + if err != nil { + return nil, err + } + + return C.GoBytes(d, l), nil +} + +// GetDataValueBytes gets the data object associated with a specific field from the +// journal entry referenced by the last completed Next/Previous function call, +// returning only the value of the object. To call GetDataValueBytes, you must first +// have called one of the Next/Previous functions. +func (j *Journal) GetDataValueBytes(field string) ([]byte, error) { + val, err := j.GetDataBytes(field) + if err != nil { + return nil, err + } + + return bytes.SplitN(val, []byte("="), 2)[1], nil +} + +// GetEntry returns a full representation of the journal entry referenced by the +// last completed Next/Previous function call, with all key-value pairs of data +// as well as address fields (cursor, realtime timestamp and monotonic timestamp). +// To call GetEntry, you must first have called one of the Next/Previous functions. +func (j *Journal) GetEntry() (*JournalEntry, error) { + sd_journal_get_realtime_usec, err := getFunction("sd_journal_get_realtime_usec") + if err != nil { + return nil, err + } + + sd_journal_get_monotonic_usec, err := getFunction("sd_journal_get_monotonic_usec") + if err != nil { + return nil, err + } + + sd_journal_get_cursor, err := getFunction("sd_journal_get_cursor") + if err != nil { + return nil, err + } + + sd_journal_restart_data, err := getFunction("sd_journal_restart_data") + if err != nil { + return nil, err + } + + sd_journal_enumerate_data, err := getFunction("sd_journal_enumerate_data") + if err != nil { + return nil, err + } + + j.mu.Lock() + defer j.mu.Unlock() + + var r C.int + entry := &JournalEntry{Fields: make(map[string]string)} + + var realtimeUsec C.uint64_t + r = C.my_sd_journal_get_realtime_usec(sd_journal_get_realtime_usec, j.cjournal, &realtimeUsec) + if r < 0 { + return nil, fmt.Errorf("failed to get realtime timestamp: %d", syscall.Errno(-r)) + } + + entry.RealtimeTimestamp = uint64(realtimeUsec) + + var monotonicUsec C.uint64_t + var boot_id C.sd_id128_t + + r = C.my_sd_journal_get_monotonic_usec(sd_journal_get_monotonic_usec, j.cjournal, &monotonicUsec, &boot_id) + if r < 0 { + return nil, fmt.Errorf("failed to get monotonic timestamp: %d", syscall.Errno(-r)) + } + + entry.MonotonicTimestamp = uint64(monotonicUsec) + + var c *C.char + // since the pointer is mutated by sd_journal_get_cursor, need to wait + // until after the call to free the memory + r = C.my_sd_journal_get_cursor(sd_journal_get_cursor, j.cjournal, &c) + defer C.free(unsafe.Pointer(c)) + if r < 0 { + return nil, fmt.Errorf("failed to get cursor: %d", syscall.Errno(-r)) + } + + entry.Cursor = C.GoString(c) + + // Implements the JOURNAL_FOREACH_DATA_RETVAL macro from journal-internal.h + var d unsafe.Pointer + var l C.size_t + C.my_sd_journal_restart_data(sd_journal_restart_data, j.cjournal) + for { + r = C.my_sd_journal_enumerate_data(sd_journal_enumerate_data, j.cjournal, &d, &l) + if r == 0 { + break + } + + if r < 0 { + return nil, fmt.Errorf("failed to read message field: %d", syscall.Errno(-r)) + } + + msg := C.GoStringN((*C.char)(d), C.int(l)) + kv := strings.SplitN(msg, "=", 2) + if len(kv) < 2 { + return nil, fmt.Errorf("failed to parse field") + } + + entry.Fields[kv[0]] = kv[1] + } + + return entry, nil +} + +// SetDataThreshold sets the data field size threshold for data returned by +// GetData. To retrieve the complete data fields this threshold should be +// turned off by setting it to 0, so that the library always returns the +// complete data objects. +func (j *Journal) SetDataThreshold(threshold uint64) error { + sd_journal_set_data_threshold, err := getFunction("sd_journal_set_data_threshold") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_set_data_threshold(sd_journal_set_data_threshold, j.cjournal, C.size_t(threshold)) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to set data threshold: %d", syscall.Errno(-r)) + } + + return nil +} + +// GetRealtimeUsec gets the realtime (wallclock) timestamp of the journal +// entry referenced by the last completed Next/Previous function call. To +// call GetRealtimeUsec, you must first have called one of the Next/Previous +// functions. +func (j *Journal) GetRealtimeUsec() (uint64, error) { + var usec C.uint64_t + + sd_journal_get_realtime_usec, err := getFunction("sd_journal_get_realtime_usec") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_get_realtime_usec(sd_journal_get_realtime_usec, j.cjournal, &usec) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to get realtime timestamp: %d", syscall.Errno(-r)) + } + + return uint64(usec), nil +} + +// GetMonotonicUsec gets the monotonic timestamp of the journal entry +// referenced by the last completed Next/Previous function call. To call +// GetMonotonicUsec, you must first have called one of the Next/Previous +// functions. +func (j *Journal) GetMonotonicUsec() (uint64, error) { + var usec C.uint64_t + var boot_id C.sd_id128_t + + sd_journal_get_monotonic_usec, err := getFunction("sd_journal_get_monotonic_usec") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_get_monotonic_usec(sd_journal_get_monotonic_usec, j.cjournal, &usec, &boot_id) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to get monotonic timestamp: %d", syscall.Errno(-r)) + } + + return uint64(usec), nil +} + +// GetCursor gets the cursor of the last journal entry reeferenced by the +// last completed Next/Previous function call. To call GetCursor, you must +// first have called one of the Next/Previous functions. +func (j *Journal) GetCursor() (string, error) { + sd_journal_get_cursor, err := getFunction("sd_journal_get_cursor") + if err != nil { + return "", err + } + + var d *C.char + // since the pointer is mutated by sd_journal_get_cursor, need to wait + // until after the call to free the memory + + j.mu.Lock() + r := C.my_sd_journal_get_cursor(sd_journal_get_cursor, j.cjournal, &d) + j.mu.Unlock() + defer C.free(unsafe.Pointer(d)) + + if r < 0 { + return "", fmt.Errorf("failed to get cursor: %d", syscall.Errno(-r)) + } + + cursor := C.GoString(d) + + return cursor, nil +} + +// TestCursor checks whether the current position in the journal matches the +// specified cursor +func (j *Journal) TestCursor(cursor string) error { + sd_journal_test_cursor, err := getFunction("sd_journal_test_cursor") + if err != nil { + return err + } + + c := C.CString(cursor) + defer C.free(unsafe.Pointer(c)) + + j.mu.Lock() + r := C.my_sd_journal_test_cursor(sd_journal_test_cursor, j.cjournal, c) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to test to cursor %q: %d", cursor, syscall.Errno(-r)) + } else if r == 0 { + return ErrNoTestCursor + } + + return nil +} + +// SeekHead seeks to the beginning of the journal, i.e. the oldest available +// entry. This call must be followed by a call to Next before any call to +// Get* will return data about the first element. +func (j *Journal) SeekHead() error { + sd_journal_seek_head, err := getFunction("sd_journal_seek_head") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_seek_head(sd_journal_seek_head, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to seek to head of journal: %d", syscall.Errno(-r)) + } + + return nil +} + +// SeekTail may be used to seek to the end of the journal, i.e. the most recent +// available entry. This call must be followed by a call to Next before any +// call to Get* will return data about the last element. +func (j *Journal) SeekTail() error { + sd_journal_seek_tail, err := getFunction("sd_journal_seek_tail") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_seek_tail(sd_journal_seek_tail, j.cjournal) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to seek to tail of journal: %d", syscall.Errno(-r)) + } + + return nil +} + +// SeekRealtimeUsec seeks to the entry with the specified realtime (wallclock) +// timestamp, i.e. CLOCK_REALTIME. This call must be followed by a call to +// Next/Previous before any call to Get* will return data about the sought entry. +func (j *Journal) SeekRealtimeUsec(usec uint64) error { + sd_journal_seek_realtime_usec, err := getFunction("sd_journal_seek_realtime_usec") + if err != nil { + return err + } + + j.mu.Lock() + r := C.my_sd_journal_seek_realtime_usec(sd_journal_seek_realtime_usec, j.cjournal, C.uint64_t(usec)) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to seek to %d: %d", usec, syscall.Errno(-r)) + } + + return nil +} + +// SeekCursor seeks to a concrete journal cursor. This call must be +// followed by a call to Next/Previous before any call to Get* will return +// data about the sought entry. +func (j *Journal) SeekCursor(cursor string) error { + sd_journal_seek_cursor, err := getFunction("sd_journal_seek_cursor") + if err != nil { + return err + } + + c := C.CString(cursor) + defer C.free(unsafe.Pointer(c)) + + j.mu.Lock() + r := C.my_sd_journal_seek_cursor(sd_journal_seek_cursor, j.cjournal, c) + j.mu.Unlock() + + if r < 0 { + return fmt.Errorf("failed to seek to cursor %q: %d", cursor, syscall.Errno(-r)) + } + + return nil +} + +// Wait will synchronously wait until the journal gets changed. The maximum time +// this call sleeps may be controlled with the timeout parameter. If +// sdjournal.IndefiniteWait is passed as the timeout parameter, Wait will +// wait indefinitely for a journal change. +func (j *Journal) Wait(timeout time.Duration) int { + var to uint64 + + sd_journal_wait, err := getFunction("sd_journal_wait") + if err != nil { + return -1 + } + + if timeout == IndefiniteWait { + // sd_journal_wait(3) calls for a (uint64_t) -1 to be passed to signify + // indefinite wait, but using a -1 overflows our C.uint64_t, so we use an + // equivalent hex value. + to = 0xffffffffffffffff + } else { + to = uint64(timeout / time.Microsecond) + } + j.mu.Lock() + r := C.my_sd_journal_wait(sd_journal_wait, j.cjournal, C.uint64_t(to)) + j.mu.Unlock() + + return int(r) +} + +// GetUsage returns the journal disk space usage, in bytes. +func (j *Journal) GetUsage() (uint64, error) { + var out C.uint64_t + + sd_journal_get_usage, err := getFunction("sd_journal_get_usage") + if err != nil { + return 0, err + } + + j.mu.Lock() + r := C.my_sd_journal_get_usage(sd_journal_get_usage, j.cjournal, &out) + j.mu.Unlock() + + if r < 0 { + return 0, fmt.Errorf("failed to get journal disk space usage: %d", syscall.Errno(-r)) + } + + return uint64(out), nil +} + +// GetUniqueValues returns all unique values for a given field. +func (j *Journal) GetUniqueValues(field string) ([]string, error) { + var result []string + + sd_journal_query_unique, err := getFunction("sd_journal_query_unique") + if err != nil { + return nil, err + } + + sd_journal_enumerate_unique, err := getFunction("sd_journal_enumerate_unique") + if err != nil { + return nil, err + } + + sd_journal_restart_unique, err := getFunction("sd_journal_restart_unique") + if err != nil { + return nil, err + } + + j.mu.Lock() + defer j.mu.Unlock() + + f := C.CString(field) + defer C.free(unsafe.Pointer(f)) + + r := C.my_sd_journal_query_unique(sd_journal_query_unique, j.cjournal, f) + + if r < 0 { + return nil, fmt.Errorf("failed to query journal: %d", syscall.Errno(-r)) + } + + // Implements the SD_JOURNAL_FOREACH_UNIQUE macro from sd-journal.h + var d unsafe.Pointer + var l C.size_t + C.my_sd_journal_restart_unique(sd_journal_restart_unique, j.cjournal) + for { + r = C.my_sd_journal_enumerate_unique(sd_journal_enumerate_unique, j.cjournal, &d, &l) + if r == 0 { + break + } + + if r < 0 { + return nil, fmt.Errorf("failed to read message field: %d", syscall.Errno(-r)) + } + + msg := C.GoStringN((*C.char)(d), C.int(l)) + kv := strings.SplitN(msg, "=", 2) + if len(kv) < 2 { + return nil, fmt.Errorf("failed to parse field") + } + + result = append(result, kv[1]) + } + + return result, nil +} + +// GetCatalog retrieves a message catalog entry for the journal entry referenced +// by the last completed Next/Previous function call. To call GetCatalog, you +// must first have called one of these functions. +func (j *Journal) GetCatalog() (string, error) { + sd_journal_get_catalog, err := getFunction("sd_journal_get_catalog") + if err != nil { + return "", err + } + + var c *C.char + + j.mu.Lock() + r := C.my_sd_journal_get_catalog(sd_journal_get_catalog, j.cjournal, &c) + j.mu.Unlock() + defer C.free(unsafe.Pointer(c)) + + if r < 0 { + return "", fmt.Errorf("failed to retrieve catalog entry for current journal entry: %d", syscall.Errno(-r)) + } + + catalog := C.GoString(c) + + return catalog, nil +} diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/read.go b/vendor/github.com/coreos/go-systemd/sdjournal/read.go new file mode 100644 index 0000000000000..51a060fb530fe --- /dev/null +++ b/vendor/github.com/coreos/go-systemd/sdjournal/read.go @@ -0,0 +1,272 @@ +// Copyright 2015 RedHat, Inc. +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdjournal + +import ( + "errors" + "fmt" + "io" + "log" + "strings" + "sync" + "time" +) + +var ( + // ErrExpired gets returned when the Follow function runs into the + // specified timeout. + ErrExpired = errors.New("Timeout expired") +) + +// JournalReaderConfig represents options to drive the behavior of a JournalReader. +type JournalReaderConfig struct { + // The Since, NumFromTail and Cursor options are mutually exclusive and + // determine where the reading begins within the journal. The order in which + // options are written is exactly the order of precedence. + Since time.Duration // start relative to a Duration from now + NumFromTail uint64 // start relative to the tail + Cursor string // start relative to the cursor + + // Show only journal entries whose fields match the supplied values. If + // the array is empty, entries will not be filtered. + Matches []Match + + // If not empty, the journal instance will point to a journal residing + // in this directory. The supplied path may be relative or absolute. + Path string + + // If not nil, Formatter will be used to translate the resulting entries + // into strings. If not set, the default format (timestamp and message field) + // will be used. If Formatter returns an error, Read will stop and return the error. + Formatter func(entry *JournalEntry) (string, error) +} + +// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the +// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines. +type JournalReader struct { + journal *Journal + msgReader *strings.Reader + formatter func(entry *JournalEntry) (string, error) +} + +// NewJournalReader creates a new JournalReader with configuration options that are similar to the +// systemd journalctl tool's iteration and filtering features. +func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { + // use simpleMessageFormatter as default formatter. + if config.Formatter == nil { + config.Formatter = simpleMessageFormatter + } + + r := &JournalReader{ + formatter: config.Formatter, + } + + // Open the journal + var err error + if config.Path != "" { + r.journal, err = NewJournalFromDir(config.Path) + } else { + r.journal, err = NewJournal() + } + if err != nil { + return nil, err + } + + // Add any supplied matches + for _, m := range config.Matches { + if err = r.journal.AddMatch(m.String()); err != nil { + return nil, err + } + } + + // Set the start position based on options + if config.Since != 0 { + // Start based on a relative time + start := time.Now().Add(config.Since) + if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil { + return nil, err + } + } else if config.NumFromTail != 0 { + // Start based on a number of lines before the tail + if err := r.journal.SeekTail(); err != nil { + return nil, err + } + + // Move the read pointer into position near the tail. Go one further than + // the option so that the initial cursor advancement positions us at the + // correct starting point. + skip, err := r.journal.PreviousSkip(config.NumFromTail + 1) + if err != nil { + return nil, err + } + // If we skipped fewer lines than expected, we have reached journal start. + // Thus, we seek to head so that next invocation can read the first line. + if skip != config.NumFromTail+1 { + if err := r.journal.SeekHead(); err != nil { + return nil, err + } + } + } else if config.Cursor != "" { + // Start based on a custom cursor + if err := r.journal.SeekCursor(config.Cursor); err != nil { + return nil, err + } + } + + return r, nil +} + +// Read reads entries from the journal. Read follows the Reader interface so +// it must be able to read a specific amount of bytes. Journald on the other +// hand only allows us to read full entries of arbitrary size (without byte +// granularity). JournalReader is therefore internally buffering entries that +// don't fit in the read buffer. Callers should keep calling until 0 and/or an +// error is returned. +func (r *JournalReader) Read(b []byte) (int, error) { + if r.msgReader == nil { + // Advance the journal cursor. It has to be called at least one time + // before reading + c, err := r.journal.Next() + + // An unexpected error + if err != nil { + return 0, err + } + + // EOF detection + if c == 0 { + return 0, io.EOF + } + + entry, err := r.journal.GetEntry() + if err != nil { + return 0, err + } + + // Build a message + msg, err := r.formatter(entry) + if err != nil { + return 0, err + } + r.msgReader = strings.NewReader(msg) + } + + // Copy and return the message + sz, err := r.msgReader.Read(b) + if err == io.EOF { + // The current entry has been fully read. Don't propagate this + // EOF, so the next entry can be read at the next Read() + // iteration. + r.msgReader = nil + return sz, nil + } + if err != nil { + return sz, err + } + if r.msgReader.Len() == 0 { + r.msgReader = nil + } + + return sz, nil +} + +// Close closes the JournalReader's handle to the journal. +func (r *JournalReader) Close() error { + return r.journal.Close() +} + +// Rewind attempts to rewind the JournalReader to the first entry. +func (r *JournalReader) Rewind() error { + r.msgReader = nil + return r.journal.SeekHead() +} + +// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The +// follow will continue until a single time.Time is received on the until channel. +func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error { + + // Process journal entries and events. Entries are flushed until the tail or + // timeout is reached, and then we wait for new events or the timeout. + var msg = make([]byte, 64*1<<(10)) + var waitCh = make(chan int, 1) + var waitGroup sync.WaitGroup + defer waitGroup.Wait() + +process: + for { + c, err := r.Read(msg) + if err != nil && err != io.EOF { + return err + } + + select { + case <-until: + return ErrExpired + default: + } + if c > 0 { + if _, err = writer.Write(msg[:c]); err != nil { + return err + } + continue process + } + + // We're at the tail, so wait for new events or time out. + // Holds journal events to process. Tightly bounded for now unless there's a + // reason to unblock the journal watch routine more quickly. + for { + waitGroup.Add(1) + go func() { + status := r.journal.Wait(100 * time.Millisecond) + waitCh <- status + waitGroup.Done() + }() + + select { + case <-until: + return ErrExpired + case e := <-waitCh: + switch e { + case SD_JOURNAL_NOP: + // the journal did not change since the last invocation + case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: + continue process + default: + if e < 0 { + return fmt.Errorf("received error event: %d", e) + } + + log.Printf("received unknown event: %d\n", e) + } + } + } + } +} + +// simpleMessageFormatter is the default formatter. +// It returns a string representing the current journal entry in a simple format which +// includes the entry timestamp and MESSAGE field. +func simpleMessageFormatter(entry *JournalEntry) (string, error) { + msg, ok := entry.Fields["MESSAGE"] + if !ok { + return "", fmt.Errorf("no MESSAGE field present in journal entry") + } + + usec := entry.RealtimeTimestamp + timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond)) + + return fmt.Sprintf("%s %s\n", timestamp, msg), nil +} diff --git a/vendor/github.com/coreos/pkg/LICENSE b/vendor/github.com/coreos/pkg/LICENSE new file mode 100644 index 0000000000000..e06d2081865a7 --- /dev/null +++ b/vendor/github.com/coreos/pkg/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/coreos/pkg/NOTICE b/vendor/github.com/coreos/pkg/NOTICE new file mode 100644 index 0000000000000..b39ddfa5cbdea --- /dev/null +++ b/vendor/github.com/coreos/pkg/NOTICE @@ -0,0 +1,5 @@ +CoreOS Project +Copyright 2014 CoreOS, Inc + +This product includes software developed at CoreOS, Inc. +(http://www.coreos.com/). diff --git a/vendor/github.com/coreos/pkg/dlopen/dlopen.go b/vendor/github.com/coreos/pkg/dlopen/dlopen.go new file mode 100644 index 0000000000000..23774f612e0f4 --- /dev/null +++ b/vendor/github.com/coreos/pkg/dlopen/dlopen.go @@ -0,0 +1,82 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package dlopen provides some convenience functions to dlopen a library and +// get its symbols. +package dlopen + +// #cgo LDFLAGS: -ldl +// #include +// #include +import "C" +import ( + "errors" + "fmt" + "unsafe" +) + +var ErrSoNotFound = errors.New("unable to open a handle to the library") + +// LibHandle represents an open handle to a library (.so) +type LibHandle struct { + Handle unsafe.Pointer + Libname string +} + +// GetHandle tries to get a handle to a library (.so), attempting to access it +// by the names specified in libs and returning the first that is successfully +// opened. Callers are responsible for closing the handler. If no library can +// be successfully opened, an error is returned. +func GetHandle(libs []string) (*LibHandle, error) { + for _, name := range libs { + libname := C.CString(name) + defer C.free(unsafe.Pointer(libname)) + handle := C.dlopen(libname, C.RTLD_LAZY) + if handle != nil { + h := &LibHandle{ + Handle: handle, + Libname: name, + } + return h, nil + } + } + return nil, ErrSoNotFound +} + +// GetSymbolPointer takes a symbol name and returns a pointer to the symbol. +func (l *LibHandle) GetSymbolPointer(symbol string) (unsafe.Pointer, error) { + sym := C.CString(symbol) + defer C.free(unsafe.Pointer(sym)) + + C.dlerror() + p := C.dlsym(l.Handle, sym) + e := C.dlerror() + if e != nil { + return nil, fmt.Errorf("error resolving symbol %q: %v", symbol, errors.New(C.GoString(e))) + } + + return p, nil +} + +// Close closes a LibHandle. +func (l *LibHandle) Close() error { + C.dlerror() + C.dlclose(l.Handle) + e := C.dlerror() + if e != nil { + return fmt.Errorf("error closing %v: %v", l.Libname, errors.New(C.GoString(e))) + } + + return nil +} diff --git a/vendor/github.com/coreos/pkg/dlopen/dlopen_example.go b/vendor/github.com/coreos/pkg/dlopen/dlopen_example.go new file mode 100644 index 0000000000000..48a660104fb4d --- /dev/null +++ b/vendor/github.com/coreos/pkg/dlopen/dlopen_example.go @@ -0,0 +1,56 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +build linux + +package dlopen + +// #include +// #include +// +// int +// my_strlen(void *f, const char *s) +// { +// size_t (*strlen)(const char *); +// +// strlen = (size_t (*)(const char *))f; +// return strlen(s); +// } +import "C" + +import ( + "fmt" + "unsafe" +) + +func strlen(libs []string, s string) (int, error) { + h, err := GetHandle(libs) + if err != nil { + return -1, fmt.Errorf(`couldn't get a handle to the library: %v`, err) + } + defer h.Close() + + f := "strlen" + cs := C.CString(s) + defer C.free(unsafe.Pointer(cs)) + + strlen, err := h.GetSymbolPointer(f) + if err != nil { + return -1, fmt.Errorf(`couldn't get symbol %q: %v`, f, err) + } + + len := C.my_strlen(strlen, cs) + + return int(len), nil +}