Skip to content

Commit

Permalink
Merge pull request #2745 from influxdb/pd-shard-metadata-to-protobuf
Browse files Browse the repository at this point in the history
Update metadata storage in the shard to use protobuf for serialization.
  • Loading branch information
pauldix committed Jun 3, 2015
2 parents c36cec7 + 808e50f commit 73a6c7e
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 5 deletions.
123 changes: 123 additions & 0 deletions tsdb/internal/meta.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions tsdb/internal/meta.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package internal;

//========================================================================
//
// Metadata
//
//========================================================================

message Series {
required string Key = 1;
repeated Tag Tags = 2;
}

message Tag {
required string Key = 1;
required string Value = 2;
}

message MeasurementFields {
repeated Field Fields = 1;
}

message Field {
required int32 ID = 1;
required string Name = 2;
required string Type = 3;
}
31 changes: 31 additions & 0 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
"time"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb/internal"

"github.com/gogo/protobuf/proto"
)

//go:generate protoc --gogo_out=. internal/meta.proto

const (
maxStringLength = 64 * 1024
)
Expand Down Expand Up @@ -857,6 +862,32 @@ type Series struct {
measurement *Measurement
}

// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
var pb internal.Series
pb.Key = &s.Key
for k, v := range s.Tags {
key := k
value := v
pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value})
}
return proto.Marshal(&pb)
}

// UnmarshalBinary decodes the object from a binary format.
func (s *Series) UnmarshalBinary(buf []byte) error {
var pb internal.Series
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
s.Key = pb.GetKey()
s.Tags = make(map[string]string)
for _, t := range pb.Tags {
s.Tags[t.GetKey()] = t.GetValue()
}
return nil
}

// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {
Expand Down
49 changes: 44 additions & 5 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"time"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb/internal"

"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
)

// Shard represents a self-contained time series database. An inverted index of the measurement and tag data is
Expand Down Expand Up @@ -146,15 +148,23 @@ func (s *Shard) WritePoints(points []Point) error {
if len(seriesToCreate) > 0 {
b := tx.Bucket([]byte("series"))
for _, sc := range seriesToCreate {
if err := b.Put([]byte(sc.series.Key), mustMarshalJSON(sc.series)); err != nil {
data, err := sc.series.MarshalBinary()
if err != nil {
return err
}
if err := b.Put([]byte(sc.series.Key), data); err != nil {
return err
}
}
}
if len(measurementFieldsToSave) > 0 {
b := tx.Bucket([]byte("fields"))
for name, m := range measurementFieldsToSave {
if err := b.Put([]byte(name), mustMarshalJSON(m)); err != nil {
data, err := m.MarshalBinary()
if err != nil {
return err
}
if err := b.Put([]byte(name), data); err != nil {
return err
}
}
Expand Down Expand Up @@ -355,7 +365,9 @@ func (s *Shard) loadMetadataIndex() error {
for k, v := c.First(); k != nil; k, v = c.Next() {
m := s.index.createMeasurementIndexIfNotExists(string(k))
mf := &measurementFields{}
mustUnmarshalJSON(v, mf)
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
for name, _ := range mf.Fields {
m.FieldNames[name] = struct{}{}
}
Expand All @@ -367,8 +379,10 @@ func (s *Shard) loadMetadataIndex() error {
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var series *Series
mustUnmarshalJSON(v, &series)
series := &Series{}
if err := series.UnmarshalBinary(v); err != nil {
return err
}
s.index.createSeriesIndexIfNotExists(measurementFromSeriesKey(string(k)), series)
}
return nil
Expand All @@ -380,6 +394,31 @@ type measurementFields struct {
codec *FieldCodec
}

// MarshalBinary encodes the object to a binary format.
func (m *measurementFields) MarshalBinary() ([]byte, error) {
var pb internal.MeasurementFields
for _, f := range m.Fields {
id := int32(f.ID)
name := f.Name
t := string(f.Type)
pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t})
}
return proto.Marshal(&pb)
}

// UnmarshalBinary decodes the object from a binary format.
func (m *measurementFields) UnmarshalBinary(buf []byte) error {
var pb internal.MeasurementFields
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
m.Fields = make(map[string]*field)
for _, f := range pb.Fields {
m.Fields[f.GetName()] = &field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())}
}
return nil
}

// createFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
Expand Down

0 comments on commit 73a6c7e

Please sign in to comment.