Skip to content

Commit

Permalink
chore: upgrade influx_tools to new protobuf library (#22727)
Browse files Browse the repository at this point in the history
  • Loading branch information
serenibyss authored Oct 21, 2021
1 parent 834632d commit 159776d
Show file tree
Hide file tree
Showing 17 changed files with 1,020 additions and 2,599 deletions.
2,460 changes: 0 additions & 2,460 deletions cmd/influx_tools/internal/format/binary/binary.pb.go

This file was deleted.

8 changes: 1 addition & 7 deletions cmd/influx_tools/internal/format/binary/common.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package binary

// TODO(dstrand1): re-enable codegen for binary.proto when influx_tools is ported to new protobuf lib
// go:generate sh -c "protoc -I$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf) -I. --gogofaster_out=Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types:. binary.proto"
//go:generate protoc --go_out=. tools_binary.proto
//go:generate stringer -type=MessageType

import "errors"
Expand Down Expand Up @@ -30,11 +29,6 @@ const (
SeriesFooterType
)

type message interface {
Size() int
MarshalTo(dAtA []byte) (int, error)
}

/*
Stream format
Expand Down
27 changes: 14 additions & 13 deletions cmd/influx_tools/internal/format/binary/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"google.golang.org/protobuf/proto"
)

type Reader struct {
Expand Down Expand Up @@ -65,7 +66,7 @@ func (r *Reader) ReadHeader() (*Header, error) {
return nil, fmt.Errorf("expected header type, got %v", t)
}
h := &Header{}
err = h.Unmarshal(lv)
err = proto.Unmarshal(lv, h)
*r.state = readBucket

return h, err
Expand Down Expand Up @@ -93,7 +94,7 @@ func (r *Reader) NextBucket() (*BucketHeader, error) {
}

bh := &BucketHeader{}
err = bh.Unmarshal(lv)
err = proto.Unmarshal(lv, bh)
if err != nil {
return nil, err
}
Expand All @@ -119,7 +120,7 @@ func (r *Reader) NextSeries() (*SeriesHeader, error) {
return nil, fmt.Errorf("expected series header type, got %v", t)
}
sh := &SeriesHeader{}
err = sh.Unmarshal(lv)
err = proto.Unmarshal(lv, sh)
if err != nil {
return nil, err
}
Expand All @@ -128,15 +129,15 @@ func (r *Reader) NextSeries() (*SeriesHeader, error) {

var pointsType MessageType
switch sh.FieldType {
case FloatFieldType:
case FieldType_FloatFieldType:
pointsType = FloatPointsType
case IntegerFieldType:
case FieldType_IntegerFieldType:
pointsType = IntegerPointsType
case UnsignedFieldType:
case FieldType_UnsignedFieldType:
pointsType = UnsignedPointsType
case BooleanFieldType:
case FieldType_BooleanFieldType:
pointsType = BooleanPointsType
case StringFieldType:
case FieldType_StringFieldType:
pointsType = StringPointsType
default:
return nil, fmt.Errorf("unsupported series field type %v", sh.FieldType)
Expand Down Expand Up @@ -212,7 +213,7 @@ func (pr *PointsReader) marshalValues(lv []byte) error {

func (pr *PointsReader) marshalFloats(lv []byte) error {
fp := &FloatPoints{}
err := fp.Unmarshal(lv)
err := proto.Unmarshal(lv, fp)
if err != nil {
return err
}
Expand All @@ -226,7 +227,7 @@ func (pr *PointsReader) marshalFloats(lv []byte) error {

func (pr *PointsReader) marshalIntegers(lv []byte) error {
ip := &IntegerPoints{}
err := ip.Unmarshal(lv)
err := proto.Unmarshal(lv, ip)
if err != nil {
return err
}
Expand All @@ -240,7 +241,7 @@ func (pr *PointsReader) marshalIntegers(lv []byte) error {

func (pr *PointsReader) marshalUnsigned(lv []byte) error {
up := &UnsignedPoints{}
err := up.Unmarshal(lv)
err := proto.Unmarshal(lv, up)
if err != nil {
return err
}
Expand All @@ -254,7 +255,7 @@ func (pr *PointsReader) marshalUnsigned(lv []byte) error {

func (pr *PointsReader) marshalBooleans(lv []byte) error {
bp := &BooleanPoints{}
err := bp.Unmarshal(lv)
err := proto.Unmarshal(lv, bp)
if err != nil {
return err
}
Expand All @@ -268,7 +269,7 @@ func (pr *PointsReader) marshalBooleans(lv []byte) error {

func (pr *PointsReader) marshalStrings(lv []byte) error {
sp := &StringPoints{}
err := sp.Unmarshal(lv)
err := proto.Unmarshal(lv, sp)
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions cmd/influx_tools/internal/format/binary/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestReader_OneBucketOneIntegerSeries(t *testing.T) {
seriesName: []byte("series"),
seriesField: []byte("field"),
seriesTags: models.NewTags(map[string]string{"k": "v"}),
fieldType: binary.IntegerFieldType,
fieldType: binary.FieldType_IntegerFieldType,
ts: ts,
vs: vs,
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestReader_OneBucketOneFloatSeries(t *testing.T) {
seriesName: []byte("series"),
seriesField: []byte("field"),
seriesTags: models.NewTags(map[string]string{"k": "v"}),
fieldType: binary.FloatFieldType,
fieldType: binary.FieldType_FloatFieldType,
ts: ts,
vs: vs,
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestReader_OneBucketOneUnsignedSeries(t *testing.T) {
seriesName: []byte("series"),
seriesField: []byte("field"),
seriesTags: models.NewTags(map[string]string{"k": "v"}),
fieldType: binary.UnsignedFieldType,
fieldType: binary.FieldType_UnsignedFieldType,
ts: ts,
vs: vs,
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestReader_OneBucketOneBooleanSeries(t *testing.T) {
seriesName: []byte("series"),
seriesField: []byte("field"),
seriesTags: models.NewTags(map[string]string{"k": "v"}),
fieldType: binary.BooleanFieldType,
fieldType: binary.FieldType_BooleanFieldType,
ts: ts,
vs: vs,
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestReader_OneBucketOneStringSeries(t *testing.T) {
seriesName: []byte("series"),
seriesField: []byte("field"),
seriesTags: models.NewTags(map[string]string{"k": "v"}),
fieldType: binary.StringFieldType,
fieldType: binary.FieldType_StringFieldType,
ts: ts,
vs: vs,
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func verifySingleSeries(t *testing.T, buf bytes.Buffer, s *oneSeriesData) {
r := binary.NewReader(&buf)
h, err := r.ReadHeader()
assertNoError(t, err)
assertEqual(t, h, &binary.Header{Database: s.db, RetentionPolicy: s.rp, ShardDuration: s.sd})
assertEqual(t, h, &binary.Header{Database: s.db, RetentionPolicy: s.rp, ShardDuration: int64(s.sd)})

bh, err := r.NextBucket()
assertNoError(t, err)
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestReader_OneBucketMixedSeries(t *testing.T) {
r := binary.NewReader(&buf)
h, err := r.ReadHeader()
assertNoError(t, err)
assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24})
assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: int64(time.Hour * 24)})

bh, err := r.NextBucket()
assertNoError(t, err)
Expand All @@ -273,7 +273,7 @@ func TestReader_OneBucketMixedSeries(t *testing.T) {

seriesKey := make([]byte, 0)
seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags1)
assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.IntegerFieldType, SeriesKey: seriesKey, Field: seriesField})
assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.FieldType_IntegerFieldType, SeriesKey: seriesKey, Field: seriesField})

for i := 0; i < len(t1s); i++ {
next, err := r.Points().Next()
Expand All @@ -293,7 +293,7 @@ func TestReader_OneBucketMixedSeries(t *testing.T) {
assertNoError(t, err)

seriesKey = models.AppendMakeKey(seriesKey[:0], seriesName, seriesTags2)
assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.FloatFieldType, SeriesKey: seriesKey, Field: seriesField})
assertEqual(t, sh, &binary.SeriesHeader{FieldType: binary.FieldType_FloatFieldType, SeriesKey: seriesKey, Field: seriesField})

for i := 0; i < len(t2s); i++ {
next, err := r.Points().Next()
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestReader_EmptyBucket(t *testing.T) {
r := binary.NewReader(&buf)
h, err := r.ReadHeader()
assertNoError(t, err)
assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: time.Hour * 24})
assertEqual(t, h, &binary.Header{Database: db, RetentionPolicy: rp, ShardDuration: int64(time.Hour * 24)})

bh, err := r.NextBucket()
assertNoError(t, err)
Expand Down
Loading

0 comments on commit 159776d

Please sign in to comment.