diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b08891a10ff..17103d1b541 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834] - Fixed typo in log message. {pull}17897[17897] - Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972] +- Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index e74800ae127..15f9384b7cf 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -455,17 +455,10 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C gzipReader.Close() } - // Check if expand_event_list_from_field is given with document content-type = "application/json" - if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" { - err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file") - p.logger.Error(err) - return err - } - - // Decode JSON documents when expand_event_list_from_field is given in config - if p.config.ExpandEventListFromField != "" { + // Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config + if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" { decoder := json.NewDecoder(reader) - err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx) + err := p.decodeJSON(decoder, objectHash, info, s3Ctx) if err != nil { err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name) p.logger.Error(err) @@ -512,33 +505,20 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C return nil } -func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { +func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { offset := 0 for { - var jsonFields map[string][]interface{} + var jsonFields interface{} err := decoder.Decode(&jsonFields) if jsonFields == nil { return nil } if err == io.EOF { - // create event for last line - // get logs from expand_event_list_from_field - textValues, ok := jsonFields[p.config.ExpandEventListFromField] - if !ok { - err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField) - p.logger.Error(err) + offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx) + if err != nil { return err } - - for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) - if err != nil { - err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name) - p.logger.Error(err) - return err - } - } } else if err != nil { // decode json failed, skip this log file err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name) @@ -546,25 +526,46 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 return nil } - textValues, ok := jsonFields[p.config.ExpandEventListFromField] - if !ok { - err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField) - p.logger.Error(err) + offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx) + if err != nil { return err } + } +} - for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) - if err != nil { - err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField) +func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { + switch f := jsonFields.(type) { + case map[string][]interface{}: + if p.config.ExpandEventListFromField != "" { + textValues, ok := f[p.config.ExpandEventListFromField] + if !ok { + err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField) p.logger.Error(err) - return err + return offset, err + } + for _, v := range textValues { + offset, err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) + if err != nil { + err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name) + p.logger.Error(err) + return offset, err + } } + return offset, nil + } + case map[string]interface{}: + offset, err := p.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx) + if err != nil { + err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name) + p.logger.Error(err) + return offset, err } + return offset, nil } + return offset, nil } -func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { +func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { vJSON, err := json.Marshal(jsonFields) logOriginal := string(vJSON) log := trimLogDelimiter(logOriginal) @@ -575,9 +576,9 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH if err != nil { err = errors.Wrap(err, "forwardEvent failed") p.logger.Error(err) - return err + return offset, err } - return nil + return offset, nil } func (p *s3Input) forwardEvent(event beat.Event) error {