Skip to content

Commit

Permalink
Promtail: Add compressed files support (#6708)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds to Promtail the ability to read compressed files. It works by:
1. Infer which compression format to use based on the file extension
2. Uncompress the file with the native `golang/compress` packages
3. Iterate over uncompressed lines and send them to Loki

Its usage is the same as our current file tailing.

**Which issue(s) this PR fixes**:
Fixes #5956 

Co-authored-by: Danny Kopping <dannykopping@gmail.com>
  • Loading branch information
DylanGuedes and dannykopping authored Sep 27, 2022
1 parent 5ee0d09 commit 73bea7e
Show file tree
Hide file tree
Showing 16 changed files with 2,601 additions and 44 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
#### Promtail

##### Enhancements

* [6708](https://github.com/grafana/loki/pull/6708) **DylanGuedes**: Add compressed files support to Promtail.
* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events
* [6395](https://github.com/grafana/loki/pull/6395) **DylanGuedes**: Add encoding support
* [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields.
* [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader

Expand Down Expand Up @@ -118,6 +119,7 @@ Here is the list with the changes that were produced since the previous release.
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail.
* [6099](https://github.com/grafana/loki/pull/6099) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage.
* [5715](https://github.com/grafana/loki/pull/5715) **chaudum**: Allow promtail to push RFC5424 formatted syslog messages
* [6395](https://github.com/grafana/loki/pull/6395) **DylanGuedes**: Add encoding support

##### Fixes
* [6034](https://github.com/grafana/loki/pull/6034) **DylanGuedes**: Promtail: Fix symlink tailing behavior.
Expand Down
299 changes: 299 additions & 0 deletions clients/pkg/promtail/targets/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package file

import (
"bufio"
"compress/bzip2"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/transform"

"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/positions"
)

func supportedCompressedFormats() map[string]struct{} {
return map[string]struct{}{
".gz": {},
".tar.gz": {},
".z": {},
".bz2": {},
// TODO: add support for .zip extension.
}
}

type decompressor struct {
metrics *Metrics
logger log.Logger
handler api.EntryHandler
positions positions.Positions

path string

posAndSizeMtx sync.Mutex
stopOnce sync.Once

running *atomic.Bool
posquit chan struct{}
posdone chan struct{}
done chan struct{}

decoder *encoding.Decoder

position int64
size int64
}

func newDecompressor(metrics *Metrics, logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string, encodingFormat string) (*decompressor, error) {
logger = log.With(logger, "component", "decompressor")

pos, err := positions.Get(path)
if err != nil {
return nil, errors.Wrap(err, "get positions")
}

var decoder *encoding.Decoder
if encodingFormat != "" {
level.Info(logger).Log("msg", "decompressor will decode messages", "from", encodingFormat, "to", "UTF8")
encoder, err := ianaindex.IANA.Encoding(encodingFormat)
if err != nil {
return nil, errors.Wrap(err, "error doing IANA encoding")
}
decoder = encoder.NewDecoder()
}

decompressor := &decompressor{
metrics: metrics,
logger: logger,
handler: api.AddLabelsMiddleware(model.LabelSet{FilenameLabel: model.LabelValue(path)}).Wrap(handler),
positions: positions,
path: path,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
position: pos,
decoder: decoder,
}

go decompressor.readLines()
go decompressor.updatePosition()
metrics.filesActive.Add(1.)
return decompressor, nil
}

// mountReader instantiate a reader ready to be used by the decompressor.
//
// The selected reader implementation is based on the extension of the given file name.
// It'll error if the extension isn't supported.
func mountReader(f *os.File, logger log.Logger) (reader io.Reader, err error) {
ext := filepath.Ext(f.Name())
var decompressLib string

if strings.Contains(ext, "gz") { // .gz, .tar.gz
decompressLib = "compress/gzip"
reader, err = gzip.NewReader(f)
} else if ext == ".z" {
decompressLib = "compress/zlib"
reader, err = zlib.NewReader(f)
} else if ext == ".bz2" {
decompressLib = "bzip2"
reader = bzip2.NewReader(f)
}
// TODO: add support for .zip extension.

level.Debug(logger).Log("msg", fmt.Sprintf("using %q to decompress file %q", decompressLib, f.Name()))

if reader != nil {
return reader, nil
}

if err != nil && err != io.EOF {
return nil, err
}

supportedExtsList := strings.Builder{}
for ext := range supportedCompressedFormats() {
supportedExtsList.WriteString(ext)
}
return nil, fmt.Errorf("file %q has unsupported extension, it has to be one of %q", f.Name(), supportedExtsList.String())
}

func (t *decompressor) updatePosition() {
positionSyncPeriod := t.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
defer func() {
positionWait.Stop()
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
close(t.posdone)
}()

for {
select {
case <-positionWait.C:
if err := t.MarkPositionAndSize(); err != nil {
level.Error(t.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", t.path, "error", err)
return
}
case <-t.posquit:
return
}
}
}

// readLines read all existing lines of the given compressed file.
//
// It first decompress the file as a whole using a reader and then it will iterate
// over its chunks, separated by '\n'.
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
func (t *decompressor) readLines() {
level.Info(t.logger).Log("msg", "read lines routine: started", "path", t.path)
t.running.Store(true)

defer func() {
t.cleanupMetrics()
level.Info(t.logger).Log("msg", "read lines routine finished", "path", t.path)
close(t.done)
}()
entries := t.handler.Chan()

f, err := os.Open(t.path)
if err != nil {
level.Error(t.logger).Log("msg", "error reading file", "path", t.path, "error", err)
return
}
defer f.Close()

r, err := mountReader(f, t.logger)
if err != nil {
level.Error(t.logger).Log("msg", "error mounting new reader", "err", err)
return
}

level.Info(t.logger).Log("msg", "successfully mounted reader", "path", t.path, "ext", filepath.Ext(t.path))

maxLoglineSize := 4096
buffer := make([]byte, maxLoglineSize)
scanner := bufio.NewScanner(r)
scanner.Buffer(buffer, maxLoglineSize)
for line := 1; ; line++ {
if !scanner.Scan() {
break
}

if scannerErr := scanner.Err(); scannerErr != nil {
if scannerErr != io.EOF {
level.Error(t.logger).Log("msg", "error scanning", "err", scannerErr)
}

break
}

if line <= int(t.position) {
// skip already seen lines.
continue
}

text := scanner.Text()
var finalText string
if t.decoder != nil {
var err error
finalText, err = t.convertToUTF8(text)
if err != nil {
level.Debug(t.logger).Log("msg", "failed to convert encoding", "error", err)
t.metrics.encodingFailures.WithLabelValues(t.path).Inc()
finalText = fmt.Sprintf("the requested encoding conversion for this line failed in Promtail/Grafana Agent: %s", err.Error())
}
} else {
finalText = text
}

t.metrics.readLines.WithLabelValues(t.path).Inc()

entries <- api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: finalText,
},
}

t.size = int64(unsafe.Sizeof(finalText))
t.position++
}
}

func (t *decompressor) MarkPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()

t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(t.size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(t.position))
t.positions.Put(t.path, t.position)

return nil
}

func (t *decompressor) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
t.stopOnce.Do(func() {
// Shut down the position marker thread
close(t.posquit)
<-t.posdone

// Save the current position before shutting down tailer
if err := t.MarkPositionAndSize(); err != nil {
level.Error(t.logger).Log("msg", "error marking file position when stopping decompressor", "path", t.path, "error", err)
}

// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-t.done
level.Info(t.logger).Log("msg", "stopped decompressor", "path", t.path)
t.handler.Stop()
})
}

func (t *decompressor) IsRunning() bool {
return t.running.Load()
}

func (t *decompressor) convertToUTF8(text string) (string, error) {
res, _, err := transform.String(t.decoder, text)
if err != nil {
return "", errors.Wrap(err, "error decoding text")
}

return res, nil
}

// cleanupMetrics removes all metrics exported by this tailer
func (t *decompressor) cleanupMetrics() {
// When we stop tailing the file, also un-export metrics related to the file
t.metrics.filesActive.Add(-1.)
t.metrics.readLines.DeleteLabelValues(t.path)
t.metrics.readBytes.DeleteLabelValues(t.path)
t.metrics.totalBytes.DeleteLabelValues(t.path)
}

func (t *decompressor) Path() string {
return t.path
}
Loading

0 comments on commit 73bea7e

Please sign in to comment.