From 7051f6da75b99ccdb06a66d566368d6d49c88df1 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 28 Dec 2015 10:46:02 +0100 Subject: [PATCH] Implement multiline tests for elasticsearch log --- filebeat/crawler/prospector.go | 4 +- .../logs/elasticsearch-multiline-log.log | 52 +++++++++++++++++++ filebeat/tests/system/config/filebeat.yml.j2 | 15 +++++- filebeat/tests/system/test_multiline.py | 40 ++++++++++++++ 4 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 filebeat/tests/files/logs/elasticsearch-multiline-log.log create mode 100644 filebeat/tests/system/test_multiline.py diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index dfadc0d3f43..792ae477b8f 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -152,6 +152,8 @@ func (p *Prospector) logRun(spoolChan chan *input.FileEvent) { // Seed last scan time p.lastscan = time.Now() + logp.Debug("prospector", "exclude_files: %s", p.ProspectorConfig.ExcludeFiles) + // Now let's do one quick scan to pick up new files for _, path := range p.ProspectorConfig.Paths { p.scan(path, spoolChan) @@ -242,7 +244,7 @@ func (p *Prospector) isFileExcluded(file string) bool { func (p *Prospector) scan(path string, output chan *input.FileEvent) { logp.Debug("prospector", "scan path %s", path) - logp.Debug("prospector", "exclude_files: %s", p.ProspectorConfig.ExcludeFiles) + // Evaluate the path as a wildcards/shell glob matches, err := filepath.Glob(path) if err != nil { diff --git a/filebeat/tests/files/logs/elasticsearch-multiline-log.log b/filebeat/tests/files/logs/elasticsearch-multiline-log.log new file mode 100644 index 00000000000..c47a22e642b --- /dev/null +++ b/filebeat/tests/files/logs/elasticsearch-multiline-log.log @@ -0,0 +1,52 @@ +[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z] +[2015-12-06 01:44:16,736][INFO ][node ] [Zach] initializing ... +[2015-12-06 01:44:16,804][INFO ][plugins ] [Zach] loaded [], sites [] +[2015-12-06 01:44:16,941][INFO ][env ] [Zach] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [66.3gb], net total_space [232.6gb], spins? [unknown], types [hfs] +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] initialized +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] starting ... +[2015-12-06 01:44:19,356][INFO ][transport ] [Zach] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[fe80::1]:9300}, {[::1]:9300} +[2015-12-06 01:44:19,367][INFO ][discovery ] [Zach] elasticsearch/qfPw9z0HQe6grbJQruTCJQ +[2015-12-06 01:44:22,405][INFO ][cluster.service ] [Zach] new_master {Zach}{qfPw9z0HQe6grbJQruTCJQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received) +[2015-12-06 01:44:22,432][INFO ][http ] [Zach] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[fe80::1]:9200}, {[::1]:9200} +[2015-12-06 01:44:22,432][INFO ][node ] [Zach] started +[2015-12-06 01:44:22,446][INFO ][gateway ] [Zach] recovered [0] indices into cluster_state +[2015-12-06 01:44:52,882][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] creating index, cause [auto(bulk api)], templates [], shards [5]/[1], mappings [process, system] +[2015-12-06 01:44:53,256][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [process] +[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,274][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][0] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vcVA0hoJdODMTz], source[{"@timestamp":"2015-12-06T00:44:52.448Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":1902,"user_p":0,"system":941,"total":2843,"start_time":"Dec03"},"mem":{"size":3616309248,"rss":156405760,"rss_p":0.01,"share":0},"name":"Google Chrome H","pid":40572,"ppid":392,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,279][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,280][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][1] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vbVA0hoJdODMTj], source[{"@timestamp":"2015-12-06T00:44:52.416Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":6643,"user_p":0.01,"system":693,"total":7336,"start_time":"01:44"},"mem":{"size":5182656512,"rss":248872960,"rss_p":0.01,"share":0},"name":"java","pid":48553,"ppid":48547,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,334][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [system] +[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem] diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 69ea994aa65..77469e70e94 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -16,23 +16,36 @@ filebeat: backoff_factor: 1 max_backoff: 0.1s force_close_files: {{force_close_files}} + {% if fields %} fields: {% for k,v in fields.items() %} {{k}}: {{v}} {% endfor %} {% endif %} + fields_under_root: {{"true" if fieldsUnderRoot else "false"}} + {% if include_lines %} include_lines: {{include_lines}} {% endif %} + {% if exclude_lines %} exclude_lines: {{exclude_lines}} {% endif %} + {% if exclude_files %} exclude_files: {{exclude_files}} {% endif %} - + + {% if multiline %} + multiline: + pattern: {{pattern}} + negate: {{negate}} + match: {{match}} + timeout: 1s + {% endif %} + spool_size: idle_timeout: 0.1s registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}} diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py new file mode 100644 index 00000000000..8a4ca010e5b --- /dev/null +++ b/filebeat/tests/system/test_multiline.py @@ -0,0 +1,40 @@ +from filebeat import TestCase +import os +import socket +import shutil + +""" +Tests for the multiline log messages +""" + + +class Test(TestCase): + def test_java_elasticsearch_log(self): + """ + Test that multi lines for java logs works. + It checks that all lines which do not start with [ are append to the last line starting with [ + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after" + ) + + os.mkdir(self.working_dir + "/log/") + shutil.copy2("../files/logs/elasticsearch-multiline-log.log", os.path.abspath(self.working_dir) + "/log/elasticsearch-multiline-log.log") + + proc = self.start_filebeat() + + # wait for the "Skipping file" log message + self.wait_until( + lambda: self.output_has(lines=20), + max_timeout=10) + + proc.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert 20 == len(output)