Skip to content

Commit

Permalink
Merge pull request #11 from grafana/v2
Browse files Browse the repository at this point in the history
Add V2 support.
  • Loading branch information
mattdurham authored Jan 13, 2025
2 parents ad8e4e5 + df44760 commit e5fe545
Show file tree
Hide file tree
Showing 22 changed files with 853 additions and 32 deletions.
19 changes: 9 additions & 10 deletions e2e_benchmark_test.go → e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
"time"
)

func TestV2E2E(b *testing.T) {
// BenchmarkV2E2E-20 2504 451484 ns/op
dir := b.TempDir()
func TestV2E2E(t *testing.T) {
dir := t.TempDir()
totalSeries := atomic.NewInt32(0)
mut := sync.Mutex{}
set := make(map[float64]struct{})
Expand All @@ -33,13 +32,13 @@ func TestV2E2E(b *testing.T) {
defer mut.Unlock()
defer r.Body.Close()
data, err := io.ReadAll(r.Body)
require.NoError(b, err)
require.NoError(t, err)
data, err = snappy.Decode(nil, data)
require.NoError(b, err)
require.NoError(t, err)

var req prompb.WriteRequest
err = req.Unmarshal(data)
require.NoError(b, err)
require.NoError(t, err)

for _, x := range req.GetTimeseries() {
totalSeries.Add(int32(len(x.Samples)))
Expand All @@ -61,7 +60,7 @@ func TestV2E2E(b *testing.T) {
}
q, err := prom.NewQueue("test", cc, dir, 10000, 1*time.Second, 1*time.Hour, prometheus.NewRegistry(), "alloy", log.NewLogfmtLogger(os.Stderr))

require.NoError(b, err)
require.NoError(t, err)
go q.Start()
defer q.Stop()

Expand Down Expand Up @@ -89,14 +88,14 @@ func TestV2E2E(b *testing.T) {
app := q.Appender(context.Background())
for _, m := range metrics {
_, err = app.Append(0, m.Labels, m.TS, float64(index))
require.NoError(b, err)
require.NoError(t, err)
index++
}
app.Commit()
}
require.Eventually(b, func() bool {
require.Eventually(t, func() bool {
return totalSeries.Load() == int32(metricCount*sends)
}, 50*time.Second, 50*time.Millisecond)
}, 50*time.Second, 1*time.Second)
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23
toolchain go1.23.1

require (
github.com/deneonet/benc v1.1.2
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3
github.com/go-kit/log v0.2.1
github.com/gogo/protobuf v1.3.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deneonet/benc v1.1.2 h1:JNJSnA53zVLjt4Bz1HwxG4tQg475LP+kd8rgUuV4tc4=
github.com/deneonet/benc v1.1.2/go.mod h1:HbL4lzHT0jkmlYa36bZw0a0Nhj4NsXG7bd/bXRxJYy4=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
Expand Down
8 changes: 7 additions & 1 deletion implementations/prometheus/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prometheus

import (
"context"
v2 "github.com/grafana/walqueue/types/v2"
"sync"
"time"

Expand Down Expand Up @@ -171,9 +172,13 @@ func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string,
return
}
var items []types.Datum
var s types.Unmarshaller
switch types.FileFormat(version) {
case types.AlloyFileVersionV1:
s := v1.GetSerializer()
s = v1.GetSerializer()
items, err = s.Unmarshal(meta, uncompressedBuf)
case types.AlloyFileVersionV2:
s = v2.NewFormat()
items, err = s.Unmarshal(meta, uncompressedBuf)
default:
level.Error(q.logger).Log("msg", "invalid version found for deserialization", "version", version)
Expand All @@ -182,6 +187,7 @@ func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string,
if err != nil {
level.Error(q.logger).Log("msg", "error deserializing", "err", err, "format", version)
}
level.Debug(q.logger).Log("found file format %s to unmarshal", version)

for _, series := range items {
// Check that the TTL.
Expand Down
3 changes: 1 addition & 2 deletions implementations/prometheus/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestQueue_Appender(t *testing.T) {
tt.testFunc(t, ctx, app)
require.Eventually(t, func() bool {
return recordsFound.Load() == tt.metricCount
}, 10*time.Second, 100*time.Millisecond)
//require.True(t, v2.OutStandingTimeSeriesBinary.Load() == 0)
}, 10*time.Second, 1*time.Second)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion network/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (m metric) Type() types.Type {
}

func (m metric) FileFormat() types.FileFormat {
return types.AlloyFileVersionV1
return types.AlloyFileVersionV2
}

func (m *metric) Free() {
Expand Down
8 changes: 4 additions & 4 deletions serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/walqueue/types"
v1 "github.com/grafana/walqueue/types/v1"
v2 "github.com/grafana/walqueue/types/v2"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/vladopajic/go-actor/actor"
Expand Down Expand Up @@ -46,8 +46,8 @@ func NewSerializer(cfg types.SerializerConfig, q types.FileStorage, stats func(s
flushTestTimer: time.NewTicker(1 * time.Second),
lastFlush: time.Now(),
stats: stats,
fileFormat: types.AlloyFileVersionV1,
ser: v1.GetSerializer(),
fileFormat: types.AlloyFileVersionV2,
ser: v2.NewFormat(),
}

return s, nil
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error {

var out []byte
err = s.ser.Marshal(func(meta map[string]string, buf []byte) error {
meta["version"] = string(types.AlloyFileVersionV1)
meta["version"] = string(types.AlloyFileVersionV2)
meta["compression"] = "snappy"
// TODO: reusing a buffer here likely increases performance.
out = snappy.Encode(buf)
Expand Down
4 changes: 2 additions & 2 deletions serialization/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package serialization
import (
"context"
"fmt"
v1 "github.com/grafana/walqueue/types/v1"
v2 "github.com/grafana/walqueue/types/v2"
"math/rand"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -95,7 +95,7 @@ func (f *fqq) Stop() {

func (f *fqq) Store(ctx context.Context, meta map[string]string, value []byte) error {
f.buf, _ = snappy.Decode(nil, value)
sg := v1.GetSerializer()
sg := v2.NewFormat()
items, err := sg.Unmarshal(meta, f.buf)
require.NoError(f.t, err)
f.total.Add(int64(len(items)))
Expand Down
17 changes: 13 additions & 4 deletions types/benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/snappy"
"github.com/grafana/walqueue/types"
v1 "github.com/grafana/walqueue/types/v1"
v2 "github.com/grafana/walqueue/types/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)
Expand All @@ -18,6 +19,7 @@ func BenchmarkDeserializeAndSerialize(b *testing.B) {
go test -bench="BenchmarkDeserializeAndSerialize" -benchmem -benchtime "5s"
cpu: 13th Gen Intel(R) Core(TM) i5-13500
BenchmarkDeserializeAndSerialize/v1-20 100 273894233 ns/op 8491 compressed_KB 106723 uncompressed_KB 455318762 B/op 1904711 allocs/op
BenchmarkDeserializeAndSerialize/v2-20 798 7529099 ns/op 228 compressed_KB 2949 uncompressed_KB 9792104 B/op 14 allocs/op
*/
lbls := make(labels.Labels, 0)
for i := 0; i < 10; i++ {
Expand All @@ -28,22 +30,29 @@ func BenchmarkDeserializeAndSerialize(b *testing.B) {
}
b.ResetTimer()
type test struct {
s types.PrometheusMarshaller
m types.PrometheusMarshaller
u types.Unmarshaller
name string
}
tests := []test{
{
// The issue the large size in V1 is the fact I messed up and used string keys (the default) instead of
// tuple/index based.
name: "v1",
s: v1.GetSerializer(),
m: v1.GetSerializer(),
u: v1.GetSerializer(),
},
{
name: "v2",
m: v2.NewFormat(),
u: v2.NewFormat(),
},
}

for _, tt := range tests {
b.Run(tt.name, func(t *testing.B) {
for n := 0; n < t.N; n++ {
s := tt.s
s := tt.m

for i := 0; i < 10_000; i++ {
aErr := s.AddPrometheusMetric(time.Now().UnixMilli(), rand.Float64(), lbls, nil, nil, nil)
Expand All @@ -64,7 +73,7 @@ func BenchmarkDeserializeAndSerialize(b *testing.B) {

uncompressed, err := snappy.Decode(nil, compressed)
require.NoError(t, err)
items, _ := s.Unmarshal(kv, uncompressed)
items, _ := tt.u.Unmarshal(kv, uncompressed)
for _, item := range items {
item.Free()
}
Expand Down
15 changes: 10 additions & 5 deletions types/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

// Marshaller provides the ability to read and write for a given schema defined by the FileFormat.;
// Marshaller provides the ability to write for a given schema defined by the FileFormat.
// These are NOT threadsafe.
type Marshaller interface {
// Unmarshal is called to create a list of datums.
// Metadata will be passed via the map.
// The buffer passed in is SAFE for reuse/unsafe strings.
Unmarshal(map[string]string, []byte) (items []Datum, err error)

// Marshal handler passes in the buffer to be written. The buffer is only valid for the lifecycle of the function call.
// Metadata is passed via the map and should be encoded into the underlying storage. The same keys and values should be returned
// on Deserialize.
Expand All @@ -25,3 +22,11 @@ type PrometheusMarshaller interface {
AddPrometheusMetric(ts int64, value float64, lbls labels.Labels, h *histogram.Histogram, fh *histogram.FloatHistogram, externalLabels map[string]string) error
AddPrometheusMetadata(name string, unit string, help string, pType string) error
}

// Unmarshaller allows reading of a given FileFormat.
type Unmarshaller interface {
// Unmarshal is called to create a list of datums.
// Metadata will be passed via the map.
// The buffer passed in is SAFE for reuse/unsafe strings.
Unmarshal(map[string]string, []byte) (items []Datum, err error)
}
1 change: 1 addition & 0 deletions types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
type FileFormat string

const AlloyFileVersionV1 = FileFormat("alloy.metrics.queue.v1")
const AlloyFileVersionV2 = FileFormat("alloy.metrics.queue.v2")

type Type string

Expand Down
2 changes: 1 addition & 1 deletion types/v1/serialization_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.uber.org/atomic"
)

func GetSerializer() types.PrometheusMarshaller {
func GetSerializer() *Serialization {
return &Serialization{
sg: &SeriesGroup{
Series: make([]*TimeSeriesBinary, 0),
Expand Down
6 changes: 4 additions & 2 deletions types/v1/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/grafana/walqueue/types"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -54,13 +55,14 @@ func TestLabels(t *testing.T) {
}

func TestBackwardsCompatability(t *testing.T) {
buf, err := os.ReadFile("v1.bin")
buf, err := os.ReadFile(filepath.Join("testdata", "v1.bin"))
require.NoError(t, err)
sg := GetSerializer()
metrics, err := sg.Unmarshal(nil, buf)
require.NoError(t, err)
require.Len(t, metrics, 1_000)
for _, m := range metrics {
require.True(t, m.FileFormat() == types.AlloyFileVersionV1)
pm := prompb.TimeSeries{}
err = pm.Unmarshal(m.Bytes())
require.NoError(t, err)
Expand Down
File renamed without changes.
9 changes: 9 additions & 0 deletions types/v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# V2 File Format

The V2 file format uses a simple binary format based on efficiency and not on backwards compatibility or future compatibility. If there is a need for changes it should be created a separate version.

The library `benc` was chosen because it was the lowest level library that offered buffer reuse, unsafe string usage and still had reasonable checks for out of bounds, and issues. This makes it more performant that `mus` and `msgp` format for example. The source of narrowing down serialization libaries came from https://alecthomas.github.io/go_serialization_benchmarks/.

Tested with mus,gencode,benc,bepod,mmsgp, and fastape. Benc hit the sweetspot on usage, nice api with reasonable checks.

The underlying format stores a type byte `metric` or `metadata`, specific metadata for each time and then the raw byte array of the protobuf bytes. This means it does not need to fully deserialize the data.
Loading

0 comments on commit e5fe545

Please sign in to comment.