Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Add compressed files support #6708

Merged
merged 37 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3c8ad6a
Implement new Reader interface.
DylanGuedes Jul 16, 2022
01800fd
Implement Decompresser.
DylanGuedes Jul 18, 2022
9bffe70
Add relevant docstrings.
DylanGuedes Jul 18, 2022
4cf7c1c
Fix positions state reuse.
DylanGuedes Jul 18, 2022
973d66f
Add CHANGELOG entry.
DylanGuedes Jul 18, 2022
f94e7e2
Fix calls to .tailers to be .readers instead.
DylanGuedes Jul 18, 2022
86107fe
Merge branch 'main' of github.com:grafana/loki into promtail-compress…
DylanGuedes Jul 25, 2022
c6ec76e
Rename decompresser as decompressor.
DylanGuedes Jul 25, 2022
58b5634
Rephrase misreading decompression log message.
DylanGuedes Jul 25, 2022
77d7fe8
Apply suggestions from code review
DylanGuedes Jul 25, 2022
e80a2f8
Add to error text all supported extensions.
DylanGuedes Jul 25, 2022
1168d7a
Doesn't ignore scanning errors != io.EOF.
DylanGuedes Jul 25, 2022
459d1cc
Remove empty error checkage.
DylanGuedes Jul 25, 2022
4bedfb2
Use map for finding supported extensions.
DylanGuedes Jul 25, 2022
f21b1d5
Merge branch 'main' into promtail-compressed-support
DylanGuedes Aug 15, 2022
f6f4c25
Update CHANGELOG.md
DylanGuedes Aug 15, 2022
fb299c6
Merge branch 'main' of github.com:grafana/loki into promtail-compress…
DylanGuedes Aug 24, 2022
8204062
Add readLines benchmark.
DylanGuedes Sep 1, 2022
ec9339b
Add .zip long scenario to benchmark.
DylanGuedes Sep 1, 2022
043b674
Fix lint.
DylanGuedes Sep 1, 2022
78794dc
Fix lint.
DylanGuedes Sep 1, 2022
9bf1ec4
Fix nocopylocks lint.
DylanGuedes Sep 2, 2022
3a27e05
Rename i to line.
DylanGuedes Sep 2, 2022
794dc33
Merge branch 'main' of github.com:grafana/loki into promtail-compress…
DylanGuedes Sep 2, 2022
d99b09e
Remove dupped changelog lines.
DylanGuedes Sep 2, 2022
a2c86db
Remove dupped log line.
DylanGuedes Sep 2, 2022
a312cb5
Return pointer for noopclient.
DylanGuedes Sep 2, 2022
7f37bea
Start counting lines at 1.
DylanGuedes Sep 19, 2022
fd1041c
Drop support for .zip
DylanGuedes Sep 21, 2022
edc3b01
Add docs section.
DylanGuedes Sep 21, 2022
ba0483e
Test correctness of the gunzip reader.
DylanGuedes Sep 21, 2022
c49af30
Apply suggestions from code review
DylanGuedes Sep 27, 2022
30b6934
Link Loki limits.
DylanGuedes Sep 27, 2022
7621ef9
Explain why Promtail doesn't support log rotations.
DylanGuedes Sep 27, 2022
fd71e02
Explain why test case scenario is necessary.
DylanGuedes Sep 27, 2022
dc57333
Add TODOs for .zip file.
DylanGuedes Sep 27, 2022
84bfeea
Add clarification to decompressing process.
DylanGuedes Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
* [6375](https://github.com/grafana/loki/pull/6375) **dannykopping**: Fix bug that prevented users from using the `json` parser after a `line_format` pipeline stage.
* [6505](https://github.com/grafana/loki/pull/6375) **dmitri-lerko** Fixes `failed to receive pubsub messages` error with promtail GCPLog client.
##### Changes
* [6726](https://github.com/grafana/loki/pull/6726) **kavirajk**: upgrades go from 1.17.9 -> 1.18.4
* [6415](https://github.com/grafana/loki/pull/6415) **salvacorts**: Evenly spread queriers across kubernetes nodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks for catching it. I think those two changelog entries had something wrong but maybe when merging with the main branch something went off. Fixing it 😅

* [6726](https://github.com/grafana/loki/pull/6726) **kavirajk** upgrades go from 1.17.9 -> 1.18.4
* [6349](https://github.com/grafana/loki/pull/6349) **simonswine**: Update the default HTTP listen port from 80 to 3100. Make sure to configure the port explicitly if you are using port 80.
* [6835](https://github.com/grafana/loki/pull/6835) **DylanGuedes**: Add new per-tenant query timeout configuration and remove engine query timeout.

Expand All @@ -31,6 +31,7 @@
##### Enhancements

* [6708](https://github.com/grafana/loki/pull/6708) **DylanGuedes**: Add compressed files support to Promtail.
* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events
* [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields.
* [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader

Expand Down
13 changes: 6 additions & 7 deletions clients/pkg/promtail/targets/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package file
import (
"bufio"
"compress/bzip2"
"compress/flate"
"compress/gzip"
"compress/zlib"
"fmt"
Expand Down Expand Up @@ -32,7 +31,6 @@ import (

func supportedCompressedFormats() map[string]struct{} {
return map[string]struct{}{
".zip": {},
".gz": {},
".tar.gz": {},
".z": {},
Expand Down Expand Up @@ -114,9 +112,6 @@ func mountReader(f *os.File, logger log.Logger) (reader io.Reader, err error) {
} else if ext == ".z" {
decompressLib = "compress/zlib"
reader, err = zlib.NewReader(f)
} else if ext == ".zip" {
decompressLib = "compress/flate"
reader = flate.NewReader(f)
} else if ext == ".bz2" {
decompressLib = "bzip2"
reader = bzip2.NewReader(f)
Expand Down Expand Up @@ -192,8 +187,12 @@ func (t *decompressor) readLines() {

level.Info(t.logger).Log("msg", "successfully mounted reader", "path", t.path, "ext", filepath.Ext(t.path))

megabyte := 1000000
maxLoglineSize := 2 * megabyte
buffer := make([]byte, 3*megabyte)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
scanner := bufio.NewScanner(r)
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; ; i++ {
scanner.Buffer(buffer, maxLoglineSize)
for line := 1; ; line++ {
if !scanner.Scan() {
break
}
Expand All @@ -206,7 +205,7 @@ func (t *decompressor) readLines() {
break
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}

if i <= int(t.position) {
if line <= int(t.position) {
// skip already seen lines.
continue
}
Expand Down
175 changes: 175 additions & 0 deletions clients/pkg/promtail/targets/file/decompresser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package file

import (
"os"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type noopClient struct {
noopChan chan api.Entry
wg sync.WaitGroup
once sync.Once
}

func (n *noopClient) Chan() chan<- api.Entry {
return n.noopChan
}

func (n *noopClient) Stop() {
n.once.Do(func() { close(n.noopChan) })
}

func newNoopClient() *noopClient {
c := &noopClient{noopChan: make(chan api.Entry)}
c.wg.Add(1)
go func() {
defer c.wg.Done()
for range c.noopChan {
// noop
}
}()
return c
}

func BenchmarkReadlines(b *testing.B) {
entryHandler := newNoopClient()

scenarios := []struct {
name string
file string
}{
{
name: "2000 lines of log .tar.gz compressed",
file: "test_fixtures/short-access.tar.gz",
},
{
name: "100000 lines of log .gz compressed",
file: "test_fixtures/long-access.gz",
},
}

for _, tc := range scenarios {
b.Run(tc.name, func(b *testing.B) {
decBase := &decompressor{
logger: log.NewNopLogger(),
running: atomic.NewBool(false),
handler: entryHandler,
path: tc.file,
}

for i := 0; i < b.N; i++ {
newDec := decBase
newDec.metrics = NewMetrics(prometheus.NewRegistry())
newDec.done = make(chan struct{})
newDec.readLines()
<-newDec.done
}
})
}
}

func TestGigantiqueGunzipFile(t *testing.T) {
file := "test_fixtures/long-access.gz"
handler := fake.New(func() {})

d := &decompressor{
logger: log.NewNopLogger(),
running: atomic.NewBool(false),
handler: handler,
path: file,
done: make(chan struct{}),
metrics: NewMetrics(prometheus.NewRegistry()),
}

d.readLines()

<-d.done
time.Sleep(time.Millisecond * 200)

entries := handler.Received()
require.Equal(t, 100000, len(entries))
}

func TestOnelineFiles(t *testing.T) {
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
fileContent, err := os.ReadFile("test_fixtures/onelinelog.log")
require.NoError(t, err)
t.Run("gunzip file", func(t *testing.T) {
file := "test_fixtures/onelinelog.log.gz"
handler := fake.New(func() {})

d := &decompressor{
logger: log.NewNopLogger(),
running: atomic.NewBool(false),
handler: handler,
path: file,
done: make(chan struct{}),
metrics: NewMetrics(prometheus.NewRegistry()),
}

d.readLines()

<-d.done
time.Sleep(time.Millisecond * 200)

entries := handler.Received()
require.Equal(t, 1, len(entries))
require.Equal(t, string(fileContent), entries[0].Line)
})

t.Run("bzip2 file", func(t *testing.T) {
file := "test_fixtures/onelinelog.log.bz2"
handler := fake.New(func() {})

d := &decompressor{
logger: log.NewNopLogger(),
running: atomic.NewBool(false),
handler: handler,
path: file,
done: make(chan struct{}),
metrics: NewMetrics(prometheus.NewRegistry()),
}

d.readLines()

<-d.done
time.Sleep(time.Millisecond * 200)

entries := handler.Received()
require.Equal(t, 1, len(entries))
require.Equal(t, string(fileContent), entries[0].Line)
})

t.Run("tar.gz file", func(t *testing.T) {
file := "test_fixtures/onelinelog.tar.gz"
handler := fake.New(func() {})

d := &decompressor{
logger: log.NewNopLogger(),
running: atomic.NewBool(false),
handler: handler,
path: file,
done: make(chan struct{}),
metrics: NewMetrics(prometheus.NewRegistry()),
}

d.readLines()

<-d.done
time.Sleep(time.Millisecond * 200)

entries := handler.Received()
require.Equal(t, 1, len(entries))
firstEntry := entries[0]
require.Contains(t, firstEntry.Line, "onelinelog.log") // contains .tar.gz headers
require.Contains(t, firstEntry.Line, `5.202.214.160 - - [26/Jan/2019:19:45:25 +0330] "GET / HTTP/1.1" 200 30975 "https://www.zanbil.ir/" "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0" "-"`)
})
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5.202.214.160 - - [26/Jan/2019:19:45:25 +0330] "GET / HTTP/1.1" 200 30975 "https://www.zanbil.ir/" "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0" "-"
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading