diff --git a/client/influxdb.go b/client/influxdb.go index 9f798cd2add..1ada4c81e13 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -517,7 +517,7 @@ func EpochToTime(epoch int64, precision string) (time.Time, error) { case "n": t = time.Unix(0, epoch) default: - return time.Time{}, fmt.Errorf("Unknowm precision %q", precision) + return time.Time{}, fmt.Errorf("Unknown precision %q", precision) } return t, nil } diff --git a/httpd/handler.go b/httpd/handler.go index 4fc9a4f9b45..7d2fb9d7a97 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -20,7 +20,9 @@ import ( "github.com/bmizerany/pat" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/tsdb" "github.com/influxdb/influxdb/uuid" ) @@ -123,6 +125,10 @@ func NewAPIHandler(s *influxdb.Server, requireAuthentication, loggingEnabled boo "write", // Data-ingest route. "POST", "/write", true, true, h.serveWrite, }, + route{ + "write_points", // Data-ingest route. + "POST", "/write_points", true, true, h.serveWritePoints, + }, route{ // Status "status", "GET", "/status", true, true, h.serveStatus, @@ -538,6 +544,103 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ } } +// serveWritePoints receives incoming series data and writes it to the database. +func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + var writeError = func(result influxdb.Result, statusCode int) { + w.WriteHeader(statusCode) + w.Write([]byte(result.Err.Error())) + return + } + + // Check to see if we have a gzip'd post + var body io.ReadCloser + if r.Header.Get("Content-encoding") == "gzip" { + b, err := gzip.NewReader(r.Body) + if err != nil { + writeError(influxdb.Result{Err: err}, http.StatusBadRequest) + return + } + body = b + defer r.Body.Close() + } else { + body = r.Body + defer r.Body.Close() + } + + b, err := ioutil.ReadAll(body) + if err != nil { + if h.WriteTrace { + h.Logger.Print("write handler failed to read bytes from request body") + } + writeError(influxdb.Result{Err: err}, http.StatusBadRequest) + return + } + if h.WriteTrace { + h.Logger.Printf("write body received by handler: %s", string(b)) + } + + precision := r.FormValue("precision") + if precision == "" { + precision = "n" + } + + points, err := tsdb.ParsePointsWithPrecision(b, time.Now().UTC(), precision) + if err != nil { + if err.Error() == "EOF" { + w.WriteHeader(http.StatusOK) + return + } + writeError(influxdb.Result{Err: err}, http.StatusBadRequest) + return + } + + database := r.FormValue("db") + if database == "" { + writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) + return + } + + if !h.server.DatabaseExists(database) { + writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound) + return + } + + if h.requireAuthentication && user == nil { + writeError(influxdb.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized) + return + } + + if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) { + writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized) + return + } + + retentionPolicy := r.Form.Get("rp") + consistencyVal := r.Form.Get("consistency") + consistency := cluster.ConsistencyLevelOne + switch consistencyVal { + case "all": + consistency = cluster.ConsistencyLevelAll + case "any": + consistency = cluster.ConsistencyLevelAny + case "one": + consistency = cluster.ConsistencyLevelOne + case "quorum": + consistency = cluster.ConsistencyLevelQuorum + } + + if err := h.server.WritePoints(database, retentionPolicy, consistency, points); err != nil { + if influxdb.IsClientError(err) { + writeError(influxdb.Result{Err: err}, http.StatusBadRequest) + } else { + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + } + return + } else { + w.WriteHeader(http.StatusNoContent) + } +} + // serveMetastore returns a copy of the metastore. func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) { // Set headers. diff --git a/server.go b/server.go index 5dd85a007f1..b6336b1e14b 100644 --- a/server.go +++ b/server.go @@ -1848,6 +1848,17 @@ func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]ui return err } +func (s *Server) WritePoints(database, retentionPolicy string, consistency cluster.ConsistencyLevel, points []tsdb.Point) error { + wpr := &cluster.WritePointsRequest{ + Database: database, + RetentionPolicy: retentionPolicy, + ConsistencyLevel: consistency, + Points: points, + } + + return s.pw.Write(wpr) +} + // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []tsdb.Point) (idx uint64, err error) { diff --git a/tsdb/points.go b/tsdb/points.go index 5bb27876c79..89cb73397fe 100644 --- a/tsdb/points.go +++ b/tsdb/points.go @@ -1,8 +1,12 @@ package tsdb import ( + "bytes" + "fmt" "hash/fnv" "sort" + "strconv" + "strings" "time" ) @@ -15,37 +19,451 @@ type Point interface { AddTag(key, value string) SetTags(tags Tags) - Fields() map[string]interface{} + Fields() Fields AddField(name string, value interface{}) Time() time.Time SetTime(t time.Time) - UnixNano() uint64 + UnixNano() int64 HashID() uint64 - Key() string + Key() []byte Data() []byte SetData(buf []byte) + + String() string } // point is the default implementation of Point. type point struct { - name string - tags Tags - time time.Time - fields map[string]interface{} - key string - data []byte + time time.Time + + // text encoding of measurement and tags + // key must always be stored sorted by tags, if the original line was not sorted, + // we need to resort it + key []byte + + // text encoding of field data + fields []byte + + // text encoding of timestamp + ts []byte + + // binary encoded field data + data []byte +} + +var escapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + '"': []byte(`\"`), + ' ': []byte(`\ `), + '=': []byte(`\=`), +} + +var escapeCodesStr = map[string]string{} + +func init() { + for k, v := range escapeCodes { + escapeCodesStr[string(k)] = string(v) + } +} + +func ParsePointsString(buf string) ([]Point, error) { + return ParsePoints([]byte(buf)) +} + +// ParsePoints returns a slice of Points from a text representation of a point +// with each point separated by newlines. +func ParsePoints(buf []byte) ([]Point, error) { + return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") +} + +func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { + points := []Point{} + var ( + pos int + block []byte + ) + for { + pos, block = scanTo(buf, pos, '\n') + pos += 1 + + if len(block) == 0 { + break + } + + pt, err := parsePoint(block, defaultTime, precision) + if err != nil { + return nil, err + } + points = append(points, pt) + + if pos >= len(buf) { + break + } + + } + return points, nil + +} + +func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { + // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + + if err != nil { + return nil, err + } + + pt := &point{ + key: key, + fields: fields, + ts: ts, + } + + if len(ts) == 0 { + pt.time = defaultTime + } + pt.SetPrecision(precision) + return pt, nil +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + + i = start + + // Determiens whether the tags are sort, assume they are + sorted := true + + // indices holds the indexes within buf of the start of each tag. For example, + // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] + // which indicates that the first tag starts at buf[4], seconds at buf[11], and + // last at buf[20] + indices := make([]int, 100) + + // tracks how many commas we've seen so we know how many values are indices. + // Since indices is an arbitraily large slice, + // we need to know how many values in the buffer are in use. + separators := 0 + + // tracks whether we've see an '=' + hasSeparator := false + + // loop over each byte in buf + for { + // reached the end of buf? + if i >= len(buf) { + if !hasSeparator && separators > 0 { + return i, buf[start:i], fmt.Errorf("missing value") + } + + break + } + + if buf[i] == '=' { + i += 1 + hasSeparator = true + continue + } + + // escaped character + if buf[i] == '\\' { + i += 2 + continue + } + + // At a tag separator (comma), track it's location + if buf[i] == ',' { + if !hasSeparator && separators > 0 { + return i, buf[start:i], fmt.Errorf("missing value") + } + i += 1 + indices[separators] = i + separators += 1 + hasSeparator = false + continue + } + + // reached end of the block? (next block would be fields) + if buf[i] == ' ' { + if !hasSeparator && separators > 0 { + return i, buf[start:i], fmt.Errorf("missing value") + } + indices[separators] = i + 1 + break + } + + i += 1 + } + + // Now we know where the key region is within buf, and the locations of tags, we + // need to deterimine if duplicate tags exist and if the tags are sorted. This iterates + // 1/2 of the list comparing each end with each other, walking towards the center from + // both sides. + for j := 0; j < separators/2; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + _, right := scanTo(buf[indices[separators-j-1]:indices[separators-j]-1], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort + if bytes.Equal(left, right) { + return i, buf[start:i], fmt.Errorf("duplicate tags") + } + + // If left is greater than right, the tags are not sorted. We must continue + // since their could be duplicate tags still. + if bytes.Compare(left, right) > 0 { + sorted = false + } + } + + // If the tags are not sorted, then sort them. This sort is inline and + // uses the tag indices we created earlier. The actual buffer is not sorted, the + // indices are using the buffer for value comparison. After the indices are sorted, + // the buffer is reconstructed from the sorted indices. + if !sorted && separators > 0 { + // Get the measurement name for later + measurement := buf[start : indices[0]-1] + + // Sort the indices + indices := indices[:separators] + insertionSort(0, separators, buf, indices) + + // Create a new key using the measurement and sorted indices + b := make([]byte, len(buf[start:i])) + pos := copy(b, measurement) + for _, i := range indices { + b[pos] = ',' + pos += 1 + _, v := scanToSpaceOr(buf, i, ',') + pos += copy(b[pos:], v) + } + + return i, b, nil + } + + return i, buf[start:i], nil +} + +func insertionSort(l, r int, buf []byte, indices []int) { + for i := l + 1; i < r; i++ { + for j := i; j > l && less(buf, indices, j, j-1); j-- { + indices[j], indices[j-1] = indices[j-1], indices[j] + } + } +} + +func less(buf []byte, indices []int, i, j int) bool { + // This grabs the tag names for i & j, it ignores the values + _, a := scanTo(buf, indices[i], '=') + _, b := scanTo(buf, indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped character + if buf[i] == '\\' { + i += 2 + continue + } + + // If the value is quoted, scan until we get to the end quote + if buf[i] == '"' { + quoted = !quoted + i += 1 + continue + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i += 1 + } + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It returns +// the ending position and the byte slice of the fields within buf and error if the +// timestamp is not in the correct numeric format +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Timestamps should integers, make sure they are so we don't need to actually + // parse the timestamp until needed + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + + // reached end of block? + if buf[i] == '\n' { + break + } + i += 1 + } + return i, buf[start:i], nil +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags +func skipWhitespace(buf []byte, i int) int { + for { + if i >= len(buf) { + return i + } + + if buf[i] == ' ' || buf[i] == '\t' { + i += 1 + continue + } + break + } + return i +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanTo(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // reached end of block? + if buf[i] == stop { + break + } + i += 1 + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // reached end of block? + if buf[i] == stop || buf[i] == ' ' { + break + } + i += 1 + } + + return i, buf[start:i] +} + +func scanTagValue(buf []byte, i int) (int, []byte) { + start := i + for { + if i >= len(buf) { + break + } + + if buf[i] == '\\' { + i += 2 + continue + } + + if buf[i] == ',' { + break + } + i += 1 + } + return i, buf[start:i] +} + +func escape(in []byte) []byte { + for b, esc := range escapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func escapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, b, esc, -1) + } + return in +} + +func unescape(in []byte) []byte { + for b, esc := range escapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func unescapeString(in string) string { + for b, esc := range escapeCodesStr { + in = strings.Replace(in, esc, b, -1) + } + return in } // NewPoint returns a new point with the given measurement name, tags, fiels and timestamp -func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.Time) Point { +func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point { return &point{ - name: name, - tags: tags, + key: makeKey([]byte(name), tags), time: time, - fields: fields, + fields: fields.MarshalBinary(), } } @@ -57,25 +475,39 @@ func (p *point) SetData(b []byte) { p.data = b } -func (p *point) Key() string { - if p.key == "" { - p.key = p.Name() + "," + string(p.tags.HashKey()) - } +func (p *point) Key() []byte { return p.key } +func (p *point) name() []byte { + _, name := scanTo(p.key, 0, ',') + return name +} + // Name return the measurement name for the point func (p *point) Name() string { - return p.name + return string(unescape(p.name())) } // SetName updates the measurement name for the point func (p *point) SetName(name string) { - p.name = name + p.key = makeKey([]byte(name), p.Tags()) } // Time return the timesteamp for the point func (p *point) Time() time.Time { + if !p.time.IsZero() { + return p.time + } + + if len(p.ts) > 0 { + ts, err := strconv.ParseInt(string(p.ts), 10, 64) + if err != nil { + return p.time + } + p.time = time.Unix(0, ts) + } + return p.time } @@ -86,88 +518,273 @@ func (p *point) SetTime(t time.Time) { // Tags returns the tag set for the point func (p *point) Tags() Tags { - return p.tags + tags := map[string]string{} + + if len(p.key) != 0 { + pos, name := scanTo(p.key, 0, ',') + + // it's an empyt key, so there are no tags + if len(name) == 0 { + return tags + } + + i := pos + 1 + var key, value []byte + for { + if i >= len(p.key) { + break + } + i, key = scanTo(p.key, i, '=') + i, value = scanTagValue(p.key, i+1) + + tags[string(unescape(key))] = string(unescape(value)) + + i += 1 + } + } + return tags +} + +func makeKey(name []byte, tags Tags) []byte { + return append(escape(name), tags.hashKey()...) } // SetTags replaces the tags for the point func (p *point) SetTags(tags Tags) { - p.tags = tags + p.key = makeKey(p.name(), tags) } // AddTag adds or replaces a tag value for a point func (p *point) AddTag(key, value string) { - p.tags[key] = value + tags := p.Tags() + tags[key] = value + p.key = makeKey(p.name(), tags) } // Fields returns the fiels for the point -func (p *point) Fields() map[string]interface{} { - return p.fields +func (p *point) Fields() Fields { + return p.unmarshalBinary() } // AddField adds or replaces a field value for a point func (p *point) AddField(name string, value interface{}) { - p.fields[name] = value + fields := p.Fields() + fields[name] = value + p.fields = fields.MarshalBinary() } -func (p *point) HashID() uint64 { +// SetPrecision will round a time to the specified precision +func (p *point) SetPrecision(precision string) { + switch precision { + case "n": + case "u": + p.SetTime(p.Time().Round(time.Microsecond)) + case "ms": + p.SetTime(p.Time().Round(time.Millisecond)) + case "s": + p.SetTime(p.Time().Round(time.Second)) + case "m": + p.SetTime(p.Time().Round(time.Minute)) + case "h": + p.SetTime(p.Time().Round(time.Hour)) + } +} - // |||| - // cpu|host|servera - encodedTags := p.tags.HashKey() - size := len(p.Name()) + len(encodedTags) - if len(encodedTags) > 0 { - size++ - } - b := make([]byte, 0, size) - b = append(b, p.Name()...) - if len(encodedTags) > 0 { - b = append(b, '|') - } - b = append(b, encodedTags...) - // TODO pick a better hashing that guarantees uniqueness - // TODO create a cash for faster lookup +func (p *point) String() string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), p.UnixNano()) +} + +func (p *point) unmarshalBinary() Fields { + return newFieldsFromBinary(p.fields) +} + +func (p *point) HashID() uint64 { h := fnv.New64a() - h.Write(b) + h.Write(p.key) sum := h.Sum64() return sum } -func (p *point) UnixNano() uint64 { - return uint64(p.time.UnixNano()) +func (p *point) UnixNano() int64 { + return p.Time().UnixNano() } type Tags map[string]string -func (t Tags) HashKey() []byte { +func (t Tags) hashKey() []byte { // Empty maps marshal to empty bytes. if len(t) == 0 { return nil } - // Extract keys and determine final size. - sz := (len(t) * 2) - 1 // separators - keys := make([]string, 0, len(t)) + escaped := Tags{} for k, v := range t { - keys = append(keys, k) + ek := escapeString(k) + ev := escapeString(v) + escaped[ek] = ev + } + + // Extract keys and determine final size. + sz := len(escaped) + (len(escaped) * 2) // separators + keys := make([]string, len(escaped)+1) + i := 0 + for k, v := range escaped { + keys[i] = k + i += 1 sz += len(k) + len(v) } + keys = keys[:i] sort.Strings(keys) - // Generate marshaled bytes. b := make([]byte, sz) buf := b + idx := 0 for _, k := range keys { - copy(buf, k) - buf[len(k)] = '|' - buf = buf[len(k)+1:] + buf[idx] = ',' + idx += 1 + copy(buf[idx:idx+len(k)], k) + idx += len(k) + buf[idx] = '=' + idx += 1 + v := escaped[k] + copy(buf[idx:idx+len(v)], v) + idx += len(v) } - for i, k := range keys { - v := t[k] - copy(buf, v) - if i < len(keys)-1 { - buf[len(v)] = '|' - buf = buf[len(v)+1:] + return b[:idx] +} + +type Fields map[string]interface{} + +func parseNumber(val []byte) (interface{}, error) { + for i := 0; i < len(val); i++ { + if val[i] == '.' { + return strconv.ParseFloat(string(val), 64) + } + if val[i] < '0' && val[i] > '9' { + return string(val), nil } } + return strconv.ParseInt(string(val), 10, 64) +} + +func newFieldsFromBinary(buf []byte) Fields { + fields := Fields{} + var ( + i int + name, valueBuf []byte + value interface{} + err error + ) + for { + if i >= len(buf) { + break + } + + i, name = scanTo(buf, i, '=') + if len(name) == 0 { + continue + } + + i, valueBuf = scanTo(buf, i+1, ',') + if len(valueBuf) == 0 { + fields[string(name)] = nil + continue + } + + // If the first char is a double-quote, then unmarshal as string + if valueBuf[0] == '"' { + value = unescapeString(string(valueBuf[1 : len(valueBuf)-1])) + // Check for numeric characters + } else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '.' { + value, err = parseNumber(valueBuf) + if err != nil { + fmt.Printf("unable to parse number value '%v': %v\n", string(valueBuf), err) + value = float64(0) + } + + // Otherwise parse it as bool + } else { + value, err = strconv.ParseBool(string(valueBuf)) + if err != nil { + fmt.Printf("unable to parse bool value '%v': %v\n", string(valueBuf), err) + value = false + } + } + fields[string(unescape(name))] = value + i += 1 + } + return fields +} + +func (p Fields) MarshalBinary() []byte { + b := []byte{} + keys := make([]string, len(p)) + i := 0 + for k, _ := range p { + keys[i] = k + i += 1 + } + sort.Strings(keys) + + for _, k := range keys { + v := p[k] + b = append(b, []byte(escapeString(k))...) + b = append(b, '=') + switch t := v.(type) { + case int: + b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...) + case int32: + b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...) + case int64: + b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...) + case float64: + // ensure there is a decimal in the encoded for + + val := []byte(strconv.FormatFloat(t, 'f', -1, 64)) + hasDecimal := t-float64(int64(t)) > 0 + b = append(b, val...) + if !hasDecimal { + b = append(b, []byte(".0")...) + } + case bool: + b = append(b, []byte(strconv.FormatBool(t))...) + case []byte: + b = append(b, t...) + case string: + b = append(b, '"') + b = append(b, []byte(t)...) + b = append(b, '"') + case nil: + // skip + default: + panic(fmt.Sprintf("unknown type: %T", v)) + } + b = append(b, ',') + } + if len(b) > 0 { + return b[0 : len(b)-1] + } return b } + +type indexedSlice struct { + indices []int + b []byte +} + +func (s *indexedSlice) Less(i, j int) bool { + _, a := scanTo(s.b, s.indices[i], '=') + _, b := scanTo(s.b, s.indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +func (s *indexedSlice) Swap(i, j int) { + s.indices[i], s.indices[j] = s.indices[j], s.indices[i] +} + +func (s *indexedSlice) Len() int { + return len(s.indices) +} diff --git a/tsdb/points_test.go b/tsdb/points_test.go index fee5f700188..9884c200de4 100644 --- a/tsdb/points_test.go +++ b/tsdb/points_test.go @@ -1,12 +1,18 @@ package tsdb -import "testing" +import ( + "bytes" + "reflect" + "strings" + "testing" + "time" +) var tags = Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"} func TestMarshal(t *testing.T) { - got := tags.HashKey() - if exp := "apple|foo|host|region|orange|bar|serverA|uswest"; string(got) != exp { + got := tags.hashKey() + if exp := ",apple=orange,foo=bar,host=serverA,region=uswest"; string(got) != exp { t.Log("got: ", string(got)) t.Log("exp: ", exp) t.Error("invalid match") @@ -15,6 +21,469 @@ func TestMarshal(t *testing.T) { func BenchmarkMarshal(b *testing.B) { for i := 0; i < b.N; i++ { - tags.HashKey() + tags.hashKey() } } + +func BenchmarkParsePointNoTags(b *testing.B) { + line := `cpu value=1 1000000000` + for i := 0; i < b.N; i++ { + ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + } +} + +func BenchmarkParsePointsTagsSorted2(b *testing.B) { + line := `cpu,host=serverA,region=us-west value=1 1000000000` + for i := 0; i < b.N; i++ { + ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + } +} + +func BenchmarkParsePointsTagsSorted5(b *testing.B) { + line := `cpu,env=prod,host=serverA,region=us-west,target=servers,zone=1c value=1 1000000000` + for i := 0; i < b.N; i++ { + ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + } +} + +func BenchmarkParsePointsTagsSorted10(b *testing.B) { + line := `cpu,env=prod,host=serverA,region=us-west,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5,target=servers,zone=1c value=1 1000000000` + for i := 0; i < b.N; i++ { + ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + } +} + +func BenchmarkParsePointsTagsUnSorted2(b *testing.B) { + line := `cpu,region=us-west,host=serverA value=1 1000000000` + for i := 0; i < b.N; i++ { + pt, _ := ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + pt[0].Key() + } +} + +func BenchmarkParsePointsTagsUnSorted5(b *testing.B) { + line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c value=1 1000000000` + for i := 0; i < b.N; i++ { + pt, _ := ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + pt[0].Key() + } +} + +func BenchmarkParsePointsTagsUnSorted10(b *testing.B) { + line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5 value=1 1000000000` + for i := 0; i < b.N; i++ { + pt, _ := ParsePoints([]byte(line)) + b.SetBytes(int64(len(line))) + pt[0].Key() + } +} + +func test(t *testing.T, line string, point Point) { + pts, err := ParsePointsWithPrecision([]byte(line), time.Unix(0, 0), "n") + if err != nil { + t.Fatalf(`ParsePoints("%s") mismatch. got %v, exp nil`, line, err) + } + + if exp := 1; len(pts) != exp { + t.Fatalf(`ParsePoints("%s") len mismatch. got %d, exp %d`, line, len(pts), exp) + } + + if exp := point.Key(); !bytes.Equal(pts[0].Key(), exp) { + t.Errorf("ParsePoints(\"%s\") key mismatch.\ngot %v\nexp %v", line, string(pts[0].Key()), string(exp)) + } + + if exp := len(point.Tags()); len(pts[0].Tags()) != exp { + t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp) + } + + for tag, value := range point.Tags() { + if pts[0].Tags()[tag] != value { + t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags()[tag], value) + } + } + + for name, value := range point.Fields() { + if !reflect.DeepEqual(pts[0].Fields()[name], value) { + t.Errorf(`ParsePoints("%s") field '%s' mismatch. got %v, exp %v`, line, name, pts[0].Fields()[name], value) + } + } + + if !pts[0].Time().Equal(point.Time()) { + t.Errorf(`ParsePoints("%s") time mismatch. got %v, exp %v`, line, pts[0].Time(), point.Time()) + } + + if !strings.HasPrefix(pts[0].String(), line) { + t.Errorf("ParsePoints string mismatch.\ngot: %v\nexp: %v", pts[0].String(), line) + } +} + +func TestParsePointNoValue(t *testing.T) { + pts, err := ParsePointsString("") + if err != nil { + t.Errorf(`ParsePoints("%s") mismatch. got %v, exp nil`, "", err) + } + + if exp := 0; len(pts) != exp { + t.Errorf(`ParsePoints("%s") len mismatch. got %v, exp %vr`, "", len(pts), exp) + } +} + +func TestParsePointNoFields(t *testing.T) { + _, err := ParsePointsString("cpu") + if err == nil { + t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu") + } +} + +func TestParsePointNoTimestamp(t *testing.T) { + test(t, "cpu value=1", NewPoint("cpu", nil, nil, time.Unix(0, 0))) +} + +func TestParsePointMissingQuote(t *testing.T) { + _, err := ParsePointsString(`cpu,host=serverA value="test`) + if err == nil { + t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu") + } +} + +func TestParsePointMissingTagValue(t *testing.T) { + _, err := ParsePointsString(`cpu,host=serverA,region value=1`) + if err == nil { + t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu") + } +} + +func TestParsePointUnescape(t *testing.T) { + // commas in measuremnt name + test(t, `cpu\,main,regions=east\,west value=1.0`, + NewPoint( + "cpu,main", // comma in the name + Tags{ + "regions": "east,west", + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // spaces in measurement name + test(t, `cpu\ load,region=east value=1.0`, + NewPoint( + "cpu load", // space in the name + Tags{ + "region": "east", + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // commas in tag names + test(t, `cpu,region\,zone=east value=1.0`, + NewPoint("cpu", + Tags{ + "region,zone": "east", // comma in the tag name + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // spaces in tag names + test(t, `cpu,region\ zone=east value=1.0`, + NewPoint("cpu", + Tags{ + "region zone": "east", // comma in the tag name + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // commas in tag values + test(t, `cpu,regions=east\,west value=1.0`, + NewPoint("cpu", + Tags{ + "regions": "east,west", // comma in the tag value + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // spaces in tag values + test(t, `cpu,regions=east\ west value=1.0`, + NewPoint("cpu", + Tags{ + "regions": "east west", // comma in the tag value + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) + + // commas in field names + test(t, `cpu,regions=east value\,ms=1.0`, + NewPoint("cpu", + Tags{ + "regions": "east", + }, + Fields{ + "value,ms": 1.0, // comma in the field name + }, + time.Unix(0, 0))) + + // spaces in field names + test(t, `cpu,regions=east value\ ms=1.0`, + NewPoint("cpu", + Tags{ + "regions": "east", + }, + Fields{ + "value ms": 1.0, // comma in the field name + }, + time.Unix(0, 0))) + + // commas in field values + test(t, `cpu,regions=east value="1,0"`, + NewPoint("cpu", + Tags{ + "regions": "east", + }, + Fields{ + "value": "1,0", // comma in the field value + }, + time.Unix(0, 0))) + + // random character escaped + test(t, `cpu,regions=eas\t value=1.0`, + NewPoint( + "cpu", + Tags{ + "regions": "eas\\t", + }, + Fields{ + "value": 1.0, + }, + time.Unix(0, 0))) +} + +func TestParsePointWithTags(t *testing.T) { + test(t, + "cpu,host=serverA,region=us-east value=1.0 1000000000", + NewPoint("cpu", + Tags{"host": "serverA", "region": "us-east"}, + Fields{"value": 1.0}, time.Unix(1, 0))) +} + +func TestParsPointWithDuplicateTags(t *testing.T) { + _, err := ParsePoints([]byte(`cpu,host=serverA,host=serverB value=1 1000000000`)) + if err == nil { + t.Fatalf(`ParsePoint() expected error. got nil`) + } +} + +func TestParsePointWithStringField(t *testing.T) { + test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`, + NewPoint("cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "value": 1.0, + "str": "foo", + "str2": "bar", + }, + time.Unix(1, 0)), + ) + + test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`, + NewPoint("cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "str": `foo " bar`, + }, + time.Unix(1, 0)), + ) + +} + +func TestParsePointWithStringWithSpaces(t *testing.T) { + test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`, + NewPoint( + "cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "value": 1.0, + "str": "foo bar", // spaces in string value + }, + time.Unix(1, 0)), + ) +} + +func TestParsePointWithStringWithEquals(t *testing.T) { + test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0, 1000000000`, + NewPoint( + "cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "value": 1.0, + "str": "foo=bar", // spaces in string value + }, + time.Unix(1, 0)), + ) +} + +func TestParsePointWithBoolField(t *testing.T) { + test(t, `cpu,host=serverA,region=us-east bool=true,boolTrue=t,false=false,falseVal=f 1000000000`, + NewPoint( + "cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "bool": true, + "boolTrue": true, + "false": false, + "falseVal": false, + }, + time.Unix(1, 0)), + ) +} + +func TestParsePointUnicodeString(t *testing.T) { + test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`, + NewPoint( + "cpu", + Tags{ + "host": "serverA", + "region": "us-east", + }, + Fields{ + "value": "wè", + }, + time.Unix(1, 0)), + ) +} + +func TestParsePointIntsFloats(t *testing.T) { + pts, err := ParsePoints([]byte(`cpu,host=serverA,region=us-east int=10,float=11.0,float2=12.1 1000000000`)) + if err != nil { + t.Fatalf(`ParsePoints() failed. got %s`, err) + } + + if exp := 1; len(pts) != exp { + t.Errorf("ParsePoint() len mismatch: got %v, exp %v", len(pts), exp) + } + pt := pts[0] + + if _, ok := pt.Fields()["int"].(int64); !ok { + t.Errorf("ParsePoint() int field mismatch: got %T, exp %T", pt.Fields()["int"], int64(10)) + } + + if _, ok := pt.Fields()["float"].(float64); !ok { + t.Errorf("ParsePoint() float field mismatch: got %T, exp %T", pt.Fields()["float64"], float64(11.0)) + } + + if _, ok := pt.Fields()["float2"].(float64); !ok { + t.Errorf("ParsePoint() float field mismatch: got %T, exp %T", pt.Fields()["float64"], float64(12.1)) + } + +} + +func TestParsePointKeyUnsorted(t *testing.T) { + pts, err := ParsePoints([]byte("cpu,last=1,first=2 value=1")) + if err != nil { + t.Fatalf(`ParsePoints() failed. got %s`, err) + } + + if exp := 1; len(pts) != exp { + t.Errorf("ParsePoint() len mismatch: got %v, exp %v", len(pts), exp) + } + pt := pts[0] + + if exp := "cpu,first=2,last=1"; string(pt.Key()) != exp { + t.Errorf("ParsePoint key not sorted. got %v, exp %v", pt.Key(), exp) + } +} + +func TestParsePointToString(t *testing.T) { + line := `cpu,host=serverA,region=us-east bool=false,float=11.0,float2=12.123,int=10,str="string val" 1000000000` + pts, err := ParsePoints([]byte(line)) + if err != nil { + t.Fatalf(`ParsePoints() failed. got %s`, err) + } + if exp := 1; len(pts) != exp { + t.Errorf("ParsePoint() len mismatch: got %v, exp %v", len(pts), exp) + } + pt := pts[0] + + got := pt.String() + if line != got { + t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line) + } + + pt = NewPoint("cpu", Tags{"host": "serverA", "region": "us-east"}, + Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"}, + time.Unix(1, 0)) + + got = pt.String() + if line != got { + t.Errorf("NewPoint() to string mismatch:\n got %v\n exp %v", got, line) + } +} + +func TestParsePointsWithPrecision(t *testing.T) { + line := `cpu,host=serverA,region=us-east value=1.0 20000000000` + pts, err := ParsePointsWithPrecision([]byte(line), time.Now().UTC(), "m") + if err != nil { + t.Fatalf(`ParsePoints() failed. got %s`, err) + } + if exp := 1; len(pts) != exp { + t.Errorf("ParsePoint() len mismatch: got %v, exp %v", len(pts), exp) + } + pt := pts[0] + + got := pt.String() + if exp := "cpu,host=serverA,region=us-east value=1.0 0"; got != exp { + t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, exp) + } +} + +func TestNewPointEscaped(t *testing.T) { + // commas + pt := NewPoint("cpu,main", Tags{"tag,bar": "value"}, Fields{"name,bar": 1.0}, time.Unix(0, 0)) + if exp := `cpu\,main,tag\,bar=value name\,bar=1.0 0`; pt.String() != exp { + t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) + } + + // spaces + pt = NewPoint("cpu main", Tags{"tag bar": "value"}, Fields{"name bar": 1.0}, time.Unix(0, 0)) + if exp := `cpu\ main,tag\ bar=value name\ bar=1.0 0`; pt.String() != exp { + t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) + } + + // equals + pt = NewPoint("cpu=main", Tags{"tag=bar": "value=foo"}, Fields{"name=bar": 1.0}, time.Unix(0, 0)) + if exp := `cpu\=main,tag\=bar=value\=foo name\=bar=1.0 0`; pt.String() != exp { + t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) + } + +} diff --git a/tsdb/shard.go b/tsdb/shard.go index 2409572a0d2..a2318b42d0b 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -146,11 +146,11 @@ func (s *Shard) WritePoints(points []Point) error { // save the raw point data b := tx.Bucket([]byte("values")) for _, p := range points { - bp, err := b.CreateBucketIfNotExists([]byte(p.Key())) + bp, err := b.CreateBucketIfNotExists(p.Key()) if err != nil { return err } - if err := bp.Put(u64tob(p.UnixNano()), p.Data()); err != nil { + if err := bp.Put(u64tob(uint64(p.UnixNano())), p.Data()); err != nil { return err } } @@ -217,8 +217,8 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*seriesCreate, []*fie for _, p := range points { // see if the series should be added to the index - if ss := s.index.series[p.Key()]; ss == nil { - series := &Series{Key: p.Key(), Tags: p.Tags()} + if ss := s.index.series[string(p.Key())]; ss == nil { + series := &Series{Key: string(p.Key()), Tags: p.Tags()} seriesToCreate = append(seriesToCreate, &seriesCreate{p.Name(), series}) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index d11fbc8c0a6..f4283e377d4 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -44,9 +44,9 @@ func TestShardWriteAndIndex(t *testing.T) { if len(index.series) != 1 { t.Fatalf("series wasn't in index") } - seriesTags := index.series[pt.Key()].Tags + seriesTags := index.series[string(pt.Key())].Tags if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { - t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[pt.Key()].Tags) + t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[string(pt.Key())].Tags) } if !reflect.DeepEqual(index.measurements["cpu"].tagKeys(), []string{"host"}) { t.Fatalf("tag key wasn't saved to measurement index")