Skip to content

Commit

Permalink
Fix race in cluster RPC serialization
Browse files Browse the repository at this point in the history
Point was accessed from multiple goroutines and there was a race on the the internal
cachedFields and cachedName fields.  Accessing these fields is unnecessary work as it
requires the point to be unmarshal into Go types and then remarshaled back into protbuf
types.  Instead, just send the line protocol version already available on the point via
the protobuf.  This avoid accesssing these cached fields and eliminates some extra work.

Possible fix for #4069
  • Loading branch information
jwilder committed Sep 15, 2015
1 parent a232668 commit c4f85f8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 235 deletions.
135 changes: 2 additions & 133 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 2 additions & 26 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,7 @@ package internal;

message WriteShardRequest {
required uint64 ShardID = 1;
repeated Point Points = 2;
}

message Field {
required string Name = 1;
oneof Value {
int32 Int32 = 2;
int64 Int64 = 3;
double Float64 = 4;
bool Bool = 5;
string String = 6;
bytes Bytes = 7;
}
}

message Tag {
required string Key = 1;
required string Value = 2;
}

message Point {
required string Name = 1;
required int64 Time = 2;
repeated Field Fields = 3;
repeated Tag Tags = 4;
repeated bytes Points = 2;
}

message WriteShardResponse {
Expand All @@ -46,4 +22,4 @@ message MapShardResponse {
optional bytes Data = 3;
repeated string TagSets = 4;
repeated string Fields = 5;
}
}
87 changes: 11 additions & 76 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"fmt"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -110,63 +111,16 @@ func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp t
}

func (w *WriteShardRequest) AddPoints(points []tsdb.Point) {
w.pb.Points = append(w.pb.Points, w.marshalPoints(points)...)
for _, p := range points {
w.pb.Points = append(w.pb.Points, []byte(p.String()))
}
}

// MarshalBinary encodes the object to a binary format.
func (w *WriteShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&w.pb)
}

func (w *WriteShardRequest) marshalPoints(points []tsdb.Point) []*internal.Point {
pts := make([]*internal.Point, len(points))
for i, p := range points {
fields := []*internal.Field{}
for k, v := range p.Fields() {
name := k
f := &internal.Field{
Name: &name,
}
switch t := v.(type) {
case int:
f.Int64 = proto.Int64(int64(t))
case int32:
f.Int32 = proto.Int32(t)
case int64:
f.Int64 = proto.Int64(t)
case float64:
f.Float64 = proto.Float64(t)
case bool:
f.Bool = proto.Bool(t)
case string:
f.String_ = proto.String(t)
case []byte:
f.Bytes = t
}
fields = append(fields, f)
}

tags := []*internal.Tag{}
for k, v := range p.Tags() {
key := k
value := v
tags = append(tags, &internal.Tag{
Key: &key,
Value: &value,
})
}
name := p.Name()
pts[i] = &internal.Point{
Name: &name,
Time: proto.Int64(p.Time().UnixNano()),
Fields: fields,
Tags: tags,
}

}
return pts
}

// UnmarshalBinary populates WritePointRequest from a binary format.
func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &w.pb); err != nil {
Expand All @@ -178,33 +132,14 @@ func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error {
func (w *WriteShardRequest) unmarshalPoints() []tsdb.Point {
points := make([]tsdb.Point, len(w.pb.GetPoints()))
for i, p := range w.pb.GetPoints() {
pt := tsdb.NewPoint(
p.GetName(), map[string]string{},
map[string]interface{}{}, time.Unix(0, p.GetTime()))

for _, f := range p.GetFields() {
n := f.GetName()
if f.Int32 != nil {
pt.AddField(n, f.GetInt32())
} else if f.Int64 != nil {
pt.AddField(n, f.GetInt64())
} else if f.Float64 != nil {
pt.AddField(n, f.GetFloat64())
} else if f.Bool != nil {
pt.AddField(n, f.GetBool())
} else if f.String_ != nil {
pt.AddField(n, f.GetString_())
} else {
pt.AddField(n, f.GetBytes())
}
}

tags := tsdb.Tags{}
for _, t := range p.GetTags() {
tags[t.GetKey()] = t.GetValue()
pt, err := tsdb.ParsePoints(p)
if err != nil {
// A error here means that one node parsed the point correctly but sent an
// unparseable version to another node. We could log and drop the point and allow
// anti-entropy to resolve the discrepancy but this shouldn't ever happen.
panic(fmt.Sprintf("failed to parse point: `%v`: %v", string(p), err))
}
pt.SetTags(tags)
points[i] = pt
points[i] = pt[0]
}
return points
}
Expand Down

0 comments on commit c4f85f8

Please sign in to comment.