Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat: schema.Apply() returns plain error or nil #7167

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions libbeat/common/schema/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@ func (err *Error) IsType(errorType ErrorType) bool {
func (err *Error) Error() string {
return fmt.Sprintf("Missing field: %s, Error: %s", err.key, err.message)
}

type KeyNotFoundError struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type KeyNotFoundError should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type KeyNotFoundError should have comment or be unexported

Key string
Err error
}

func (err *KeyNotFoundError) Error() string {
msg := fmt.Sprintf("Key `%s` not found", err.Key)
if err.Err != nil {
msg += ": " + err.Err.Error()
}
return msg
}
37 changes: 11 additions & 26 deletions libbeat/common/schema/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,35 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

type Errors []Error
type Errors []*Error

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Errors should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Errors should have comment or be unexported


func NewErrors() *Errors {
return &Errors{}
}

func (errs *Errors) AddError(err *Error) {
*errs = append(*errs, *err)
}

func (errs *Errors) AddErrors(errors *Errors) {
if errors == nil {
return
}
*errs = append(*errs, *errors...)
}

func (errs *Errors) HasRequiredErrors() bool {
for _, err := range *errs {
func (errs Errors) HasRequiredErrors() bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Errors.HasRequiredErrors should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Errors.HasRequiredErrors should have comment or be unexported

for _, err := range errs {
if err.IsType(RequiredType) {
return true
}
}
return false
}

func (errs *Errors) Error() string {
error := "Required fields are missing: "
for _, err := range *errs {
func (errs Errors) Error() string {
var required []string
for _, err := range errs {
if err.IsType(RequiredType) {
error = error + "," + err.key
required = append(required, err.key)
}
}
return error
return "Required fields are missing: " + strings.Join(required, ", ")
}

// Log logs all missing required and optional fields to the debug log.
func (errs *Errors) Log() {
if len(*errs) == 0 {
func (errs Errors) Log() {
if len(errs) == 0 {
return
}
var optional, required []string

for _, err := range *errs {
for _, err := range errs {
if err.IsType(RequiredType) {
required = append(required, err.key)
} else {
Expand Down
5 changes: 2 additions & 3 deletions libbeat/common/schema/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
)

func TestErrors(t *testing.T) {
errs := NewErrors()
err := NewError("test", "Hello World")
errs.AddError(err)
var errs Errors
errs = append(errs, NewError("test", "Hello World"))

assert.True(t, errs.HasRequiredErrors())
}
21 changes: 9 additions & 12 deletions libbeat/common/schema/mapstriface/mapstriface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type ConvMap struct {
}

// Map drills down in the data dictionary by using the key
func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) *schema.Errors {
func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) schema.Errors {
subData, ok := data[convMap.Key].(map[string]interface{})
if !ok {
err := schema.NewError(convMap.Key, "Error accessing sub-dictionary")
Expand All @@ -80,10 +80,7 @@ func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]inte
logp.Err("Error accessing sub-dictionary `%s`", convMap.Key)
}

errors := schema.NewErrors()
errors.AddError(err)

return errors
return schema.Errors{err}
}

subEvent := common.MapStr{}
Expand All @@ -107,7 +104,7 @@ func Dict(key string, s schema.Schema, opts ...DictSchemaOption) ConvMap {
func toStrFromNum(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
return false, fmt.Errorf("Key %s not found", key)
return false, &schema.KeyNotFoundError{Key: key}
}
switch emptyIface.(type) {
case int, int32, int64, uint, uint32, uint64, float32, float64:
Expand All @@ -127,7 +124,7 @@ func StrFromNum(key string, opts ...schema.SchemaOption) schema.Conv {
func toStr(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, err := common.MapStr(data).GetValue(key)
if err != nil {
return "", fmt.Errorf("Key %s not found: %s", key, err.Error())
return "", &schema.KeyNotFoundError{Key: key}
}
str, ok := emptyIface.(string)
if !ok {
Expand All @@ -144,7 +141,7 @@ func Str(key string, opts ...schema.SchemaOption) schema.Conv {
func toIfc(key string, data map[string]interface{}) (interface{}, error) {
intf, err := common.MapStr(data).GetValue(key)
if err != nil {
return "", fmt.Errorf("Key %s not found: %s", key, err.Error())
return "", &schema.KeyNotFoundError{Key: key, Err: err}
}
return intf, nil
}
Expand All @@ -157,7 +154,7 @@ func Ifc(key string, opts ...schema.SchemaOption) schema.Conv {
func toBool(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
return false, fmt.Errorf("Key %s not found", key)
return false, &schema.KeyNotFoundError{Key: key}
}
boolean, ok := emptyIface.(bool)
if !ok {
Expand All @@ -174,7 +171,7 @@ func Bool(key string, opts ...schema.SchemaOption) schema.Conv {
func toInteger(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
return 0, fmt.Errorf("Key %s not found", key)
return 0, &schema.KeyNotFoundError{Key: key}
}
switch emptyIface.(type) {
case int64:
Expand Down Expand Up @@ -208,7 +205,7 @@ func Float(key string, opts ...schema.SchemaOption) schema.Conv {
func toFloat(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
return 0, fmt.Errorf("key %s not found", key)
return 0, &schema.KeyNotFoundError{Key: key}
}
switch emptyIface.(type) {
case float64:
Expand Down Expand Up @@ -242,7 +239,7 @@ func Int(key string, opts ...schema.SchemaOption) schema.Conv {
func toTime(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
return common.Time(time.Unix(0, 0)), fmt.Errorf("Key %s not found", key)
return common.Time(time.Unix(0, 0)), &schema.KeyNotFoundError{Key: key}
}

switch emptyIface.(type) {
Expand Down
49 changes: 49 additions & 0 deletions libbeat/common/schema/mapstriface/mapstriface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,52 @@ func TestConversions(t *testing.T) {
output, _ := schema.Apply(input)
assert.Equal(t, output, expected)
}

func TestOptionalField(t *testing.T) {
cases := []struct {
Description string
Input map[string]interface{}
Schema s.Schema
Expected common.MapStr
ExpectError bool
}{
{
"missing optional field",
map[string]interface{}{
"testString": "hello",
"testInt": 42,
},
s.Schema{
"test_string": Str("testString"),
"test_int": Int("testInt"),
"test_opt": Bool("testOptionalInt", s.Optional),
},
common.MapStr{
"test_string": "hello",
"test_int": int64(42),
},
false,
},
{
"wrong format in optional field",
map[string]interface{}{
"testInt": "hello",
},
s.Schema{
"test_int": Int("testInt", s.Optional),
},
common.MapStr{},
true,
},
}

for _, c := range cases {
output, err := c.Schema.Apply(c.Input)
if c.ExpectError {
assert.Error(t, err, c.Description)
} else {
assert.NoError(t, err, c.Description)
assert.Equal(t, c.Expected, output, c.Description)
}
}
}
2 changes: 1 addition & 1 deletion libbeat/common/schema/mapstrstr/mapstrstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func Str(key string, opts ...schema.SchemaOption) schema.Conv {
func getString(key string, data map[string]interface{}) (string, error) {
val, exists := data[key]
if !exists {
return "", fmt.Errorf("Key `%s` not found", key)
return "", &schema.KeyNotFoundError{Key: key}
}

str, ok := val.(string)
Expand Down
38 changes: 18 additions & 20 deletions libbeat/common/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Schema map[string]Mapper
type Mapper interface {
// Map applies the Mapper conversion on the data and adds the result
// to the event on the key.
Map(key string, event common.MapStr, data map[string]interface{}) *Errors
Map(key string, event common.MapStr, data map[string]interface{}) Errors

HasKey(key string) bool
}
Expand All @@ -25,26 +25,21 @@ type Conv struct {
Optional bool // Whether to log errors if the key is not found
}

// Convertor function type
// Converter function type
type Converter func(key string, data map[string]interface{}) (interface{}, error)

// Map applies the conversion on the data and adds the result
// to the event on the key.
func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{}) *Errors {
func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{}) Errors {
value, err := conv.Func(conv.Key, data)
if err != nil {
err := NewError(key, err.Error())
if conv.Optional {
err.SetType(OptionalType)
if _, keyNotFound := err.(*KeyNotFoundError); keyNotFound && conv.Optional {
return nil
}
return Errors{NewError(key, err.Error())}

errs := NewErrors()
errs.AddError(err)
return errs

} else {
event[key] = value
}
event[key] = value
return nil
}

Expand All @@ -55,7 +50,7 @@ func (conv Conv) HasKey(key string) bool {
// implements Mapper interface for structure
type Object map[string]Mapper

func (o Object) Map(key string, event common.MapStr, data map[string]interface{}) *Errors {
func (o Object) Map(key string, event common.MapStr, data map[string]interface{}) Errors {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Object.Map should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Object.Map should have comment or be unexported

subEvent := common.MapStr{}
errs := applySchemaToEvent(subEvent, data, o)
event[key] = subEvent
Expand All @@ -68,14 +63,17 @@ func (o Object) HasKey(key string) bool {

// ApplyTo adds the fields extracted from data, converted using the schema, to the
// event map.
func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}) (common.MapStr, *Errors) {
func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}) (common.MapStr, error) {
errors := applySchemaToEvent(event, data, s)
errors.Log()
return event, errors
if len(errors) > 0 {
errors.Log()
return event, errors
}
return event, nil
}

// Apply converts the fields extracted from data, using the schema, into a new map and reports back the errors.
func (s Schema) Apply(data map[string]interface{}) (common.MapStr, *Errors) {
func (s Schema) Apply(data map[string]interface{}) (common.MapStr, error) {
return s.ApplyTo(common.MapStr{}, data)
}

Expand All @@ -93,11 +91,11 @@ func hasKey(key string, mappers map[string]Mapper) bool {
return false
}

func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) *Errors {
errs := NewErrors()
func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) Errors {
var errs Errors
for key, mapper := range conversions {
errors := mapper.Map(key, event, data)
errs.AddErrors(errors)
errs = append(errs, errors...)
}
return errs
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/apache/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (
}
)

func applySchema(event common.MapStr, fullEvent map[string]interface{}) *s.Errors {
func applySchema(event common.MapStr, fullEvent map[string]interface{}) error {
applicableSchema := schema
if _, found := fullEvent["ServerUptimeSeconds"]; !found {
applicableSchema = schemaOld
Expand All @@ -86,7 +86,7 @@ func applySchema(event common.MapStr, fullEvent map[string]interface{}) *s.Error
}

// Map body to MapStr
func eventMapping(scanner *bufio.Scanner, hostname string) (common.MapStr, *s.Errors) {
func eventMapping(scanner *bufio.Scanner, hostname string) (common.MapStr, error) {
var (
totalS int
totalR int
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/apache/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,6 @@ func TestStatusOutputs(t *testing.T) {
scanner := bufio.NewScanner(f)

_, errors := eventMapping(scanner, "localhost")
assert.False(t, errors.HasRequiredErrors(), "error mapping "+filename)
assert.NoError(t, errors)
}
}
11 changes: 6 additions & 5 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package index
import (
"encoding/json"

"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
Expand Down Expand Up @@ -32,19 +34,18 @@ var (
}
)

func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []error {

func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error {
var indicesStruct struct {
Indices map[string]map[string]interface{} `json:"indices"`
}

err := json.Unmarshal(content, &indicesStruct)
if err != nil {
r.Error(err)
return []error{err}
return err
}

var errs []error
var errs multierror.Errors
for name, index := range indicesStruct.Indices {
event := mb.Event{}
event.MetricSetFields, err = schema.Apply(index)
Expand All @@ -60,5 +61,5 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []e
event.ModuleFields.Put("cluster.id", info.ClusterID)
r.Event(event)
}
return errs
return errs.Err()
}
Loading