Skip to content

Commit

Permalink
Merge pull request #2166 from influxdb/dont_panic_on_missing_field_ma…
Browse files Browse the repository at this point in the history
…pping

Don't panic if presented with a field of unknown type
  • Loading branch information
otoolep committed Apr 4, 2015
2 parents 48f491f + 501b4ce commit 8a0f67f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#2165](https://github.com/influxdb/influxdb/pull/2165): Monitoring database and retention policy are not configurable.
- [#2167](https://github.com/influxdb/influxdb/pull/2167): Add broker log recovery.
- [#2050](https://github.com/influxdb/influxdb/issues/2050): Refactor `results` to `response`. Breaking Go Client change.
- [#2166](https://github.com/influxdb/influxdb/pull/2166): Don't panic if presented with a field of unknown type.

## v0.9.0-rc19 [2015-04-01]

Expand Down
25 changes: 17 additions & 8 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,12 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
}
field, ok := f.fieldsByID[b[0]]
if !ok {
panic(fmt.Sprintf("field ID %d has no mapping", b[0]))
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return 0, ErrFieldUnmappedID
}

var value interface{}
Expand Down Expand Up @@ -875,9 +880,9 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
}

// DecodeFields decodes a byte slice into a set of field ids and values.
func (f *FieldCodec) DecodeFields(b []byte) map[uint8]interface{} {
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) {
if len(b) == 0 {
return nil
return nil, nil
}

// Create a map to hold the decoded data.
Expand All @@ -893,7 +898,8 @@ func (f *FieldCodec) DecodeFields(b []byte) map[uint8]interface{} {
fieldID := b[0]
field := f.fieldsByID[fieldID]
if field == nil {
panic(fmt.Sprintf("field ID %d has no mapping", fieldID))
// See note in DecodeByID() regarding field-mapping failures.
return nil, ErrFieldUnmappedID
}

var value interface{}
Expand Down Expand Up @@ -923,20 +929,23 @@ func (f *FieldCodec) DecodeFields(b []byte) map[uint8]interface{} {

}

return values
return values, nil
}

// DecodeFieldsWithNames decodes a byte slice into a set of field names and values
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) map[string]interface{} {
fields := f.DecodeFields(b)
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error) {
fields, err := f.DecodeFields(b)
if err != nil {
return nil, err
}
m := make(map[string]interface{})
for id, v := range fields {
field := f.fieldsByID[id]
if field != nil {
m[field.Name] = v
}
}
return m
return m, nil
}

// FieldByName returns the field by its name. It will return a nil if not found
Expand Down
4 changes: 4 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ var (
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors.New("field not found")

// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors.New("field ID not mapped")

// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
ErrSeriesNotFound = errors.New("series not found")

Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,8 +2015,8 @@ func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[str

// Decode into a raw value map.
codec := NewFieldCodec(mm)
rawFields := codec.DecodeFields(data)
if rawFields == nil {
rawFields, err := codec.DecodeFields(data)
if err != nil || rawFields == nil {
return nil, nil
}

Expand Down
19 changes: 10 additions & 9 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,14 @@ func (l *LocalMapper) Next() (seriesID uint32, timestamp int64, value interface{
var value interface{}
var err error
if l.isRaw && len(l.selectFields) > 1 {
fieldsWithNames := l.decoder.DecodeFieldsWithNames(l.valueBuffer[min])
value = fieldsWithNames
if fieldsWithNames, err := l.decoder.DecodeFieldsWithNames(l.valueBuffer[min]); err == nil {
value = fieldsWithNames

// if there's a where clause, make sure we don't need to filter this value
if l.filters[min] != nil {
if !matchesWhere(l.filters[min], fieldsWithNames) {
value = nil
// if there's a where clause, make sure we don't need to filter this value
if l.filters[min] != nil {
if !matchesWhere(l.filters[min], fieldsWithNames) {
value = nil
}
}
}
} else {
Expand All @@ -409,8 +410,8 @@ func (l *LocalMapper) Next() (seriesID uint32, timestamp int64, value interface{
value = nil
}
} else { // decode everything
fieldsWithNames := l.decoder.DecodeFieldsWithNames(l.valueBuffer[min])
if !matchesWhere(l.filters[min], fieldsWithNames) {
fieldsWithNames, err := l.decoder.DecodeFieldsWithNames(l.valueBuffer[min])
if err != nil || !matchesWhere(l.filters[min], fieldsWithNames) {
value = nil
}
}
Expand Down Expand Up @@ -462,5 +463,5 @@ func splitIdent(s string) (db, rp, m string, err error) {
type fieldDecoder interface {
DecodeByID(fieldID uint8, b []byte) (interface{}, error)
FieldByName(name string) *Field
DecodeFieldsWithNames(b []byte) map[string]interface{}
DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
}

0 comments on commit 8a0f67f

Please sign in to comment.