Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-start of logstash dies when no data were provided for some aggregate task_id patterns #62

Closed
mrchang7 opened this issue Mar 8, 2017 · 19 comments

Comments

@mrchang7
Copy link

mrchang7 commented Mar 8, 2017

Hi again,

Sorry for many issues. I am testing aggregate 2.5.1 with Logstash 2.4.1.

This is a scenario I experience that the restart of logstash dies immediately after reading the .aggregate map file.

  1. start logstash A.conf including task_id patterns T1 and T2, but with providing data only for T1
  2. stop logstash A.conf, which generates .aggregate map file
  3. re-start logstash A.conf, which dies after reading the .aggregate file

So, I suspect that the .aggregate file does not contain the map values for T2 and it makes a certain conflict when the logstash re-start reads the file. I bet you will know exactly what the issue is.

Below is the logstash error log for your information:

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
NoMethodError: undefined method `length' for nil:NilClass
     remove_expired_maps at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:592
             synchronize at org/jruby/ext/thread/Mutex.java:149
     remove_expired_maps at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:590
                   flush at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:560
              initialize at (eval):248
                    call at org/jruby/RubyProc.java:281
           flush_filters at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:436
                    each at org/jruby/RubyArray.java:1613
           flush_filters at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:435
  flush_filters_to_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:467
             worker_loop at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:227
           start_workers at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:201

Thanks.

@mrchang7 mrchang7 changed the title Aggregate 2.5.1 Re-start of logstash dies when no data are provided for some aggregate task_id patterns Mar 8, 2017
@mrchang7 mrchang7 changed the title Re-start of logstash dies when no data are provided for some aggregate task_id patterns Re-start of logstash dies when no data were provided for some aggregate task_id patterns Mar 8, 2017
@fbaligand
Copy link
Collaborator

Well, that's weird...
I'm surprised because normally aggregate plugin manages this case I have tested this kind of behavior.
Currently, I'm on holidays and far away of my computer.
I will try to reproduce your issue this we. And if so, fix it.

@fbaligand
Copy link
Collaborator

Question : do you reproduce the issue with 2.3.1 aggregate plugin version?

@doingitbigdata
Copy link

Could the issue have to do with line 592?

@logger.debug("Aggregate remove_expired_maps call with '#{@task_id}' pattern and #{@@aggregate_maps[@task_id].length} maps")

If the task_id does not exist in the aggregate_maps?

@fbaligand
Copy link
Collaborator

Yes, but it should not be nil because of line 404.

@doingitbigdata
Copy link

doingitbigdata commented Mar 9, 2017

We ran it with --debug logs. The last logs are as follows:

{:timestamp=>"2017-03-09T17:32:00.766000+0000", :message=>"Flushing", :plugin=><LogStash::Filters::Aggregate task_id=>"%{source}", map_action=>"update", end_of_task=>true, timeout=>172800, periodic_flush=>false, push_map_as_event_on_timeout=>false, push_previous_map_as_event=>false>, :level=>:debug, :file=>"(eval)", :line=>"246", :method=>"initialize"}
{:timestamp=>"2017-03-09T17:32:00.767000+0000", :message=>"Aggregate flush call with {}", :level=>:debug, :file=>"logstash/filters/aggregate.rb", :line=>"545", :method=>"flush"}

The "source" task_id is expected to have 0 maps in this case

@mrchang7
Copy link
Author

mrchang7 commented Mar 9, 2017

2.3.1 aggregate works fine with the same scenario.

@fbaligand
Copy link
Collaborator

@mrchang7
Thanks for the info, it should help a lot to fix the issue.

@fbaligand
Copy link
Collaborator

@mrchang7
I tried to reproduce your issue, but I didn't succeed.

Actually, I'm really surprised by this issue, because it should never append because of line 404.

Could you provide a full Logstash configuration and a sample data so that I can reproduce your issue ?

@mrchang7
Copy link
Author

mrchang7 commented Mar 13, 2017

Yeah, reproduction was not easy even as I make a simple example as below.

Basically, we use a filebeat for data shipping. And three logstash conf's are put in a folder and we provide the folder path when we run logstash (with option -w 1 -f <folder_path>).

filebeat.yml

filebeat:
  prospectors:
    -
      paths:
        - /etc/filebeat/data.txt      ## content of data does not matter for this test
      input_type: log
      document_type: DataType_A
      fields_under_root: true
      fields:
        TaskPattern: P1
    -
      paths:
        - /etc/filebeat/absent_data.txt    ## path to the file which does not exist on purpose
      input_type: log
      document_type: DataType_B
      fields_under_root: true
      fields:
        TaskPattern_2: P2

  registry_file: /var/lib/filebeat/registry

output:
  logstash:
    hosts: ["kvm1088:5044"]
    bulk_max_size: 512
    worker: 1

input.conf

input { beats { port => 5044 } }

second.conf

filter {
    if [type] == "DataType_A" {
        if [TaskPattern] {
            aggregate {
                aggregate_maps_path => "/etc/logstash/conf.d/.aggregate_maps"
                task_id => "%{TaskPattern}"
                code => "map['TaskPattern'] = event['TaskPattern']"
            }
        }
    }
}
output { stdout { codec => rubydebug } }

first.conf

filter {
    if [type] == "DataType_B" {
        aggregate {
            task_id => "%{TaskPattern_2}"
            code => "map['TaskPattern'] = event['TaskPattern_2']"
        }
    }
}

Please note "TaskPattern_2" is not defined/shipped from filebeat for this test purpose.

FYI, below is the --debug logs when I re-started the logstash with generated .aggregate_maps file.

starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"213", :method=>"execute"}
starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"487", :method=>"start_pipeline"}
Settings: User set pipeline workers: 1, Default pipeline workers: 12
log4j java properties setup {:log4j_level=>"DEBUG", :level=>:debug, :file=>"logstash/logging.rb", :line=>"89", :method=>"setup_log4j"}
Beats inputs: Starting input listener {:address=>"0.0.0.0:5044", :level=>:info, :file=>"logstash/inputs/beats.rb", :line=>"160", :method=>"register"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"rubydebug", :path=>"logstash/codecs/rubydebug", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
config LogStash::Codecs::RubyDebug/@metadata = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Outputs::Stdout/@codec = <LogStash::Codecs::RubyDebug metadata=>false> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Outputs::Stdout/@workers = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
Will start workers for output {:worker_count=>1, :class=>"LogStash::Outputs::Stdout", :level=>:debug, :file=>"logstash/output_delegator.rb", :line=>"77", :method=>"register"}
Aggregate register call {:code=>"", :level=>:debug, :file=>"logstash/filters/aggregate.rb", :line=>"352", :method=>"register"}
Aggregate register call {:code=>"map['TaskPattern'] = event['TaskPattern']", :level=>:debug, :file=>"logstash/filters/aggregate.rb", :line=>"352", :method=>"register"}
Aggregate maps loaded from : /etc/logstash/conf.d/.aggregate_maps {:level=>:info, :file=>"logstash/filters/aggregate.rb", :line=>"400", :method=>"register"}
Starting pipeline {:id=>"main", :pipeline_workers=>1, :batch_size=>125, :batch_delay=>5, :max_inflight=>125, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"188", :method=>"start_workers"}
Pipeline main started {:file=>"logstash/agent.rb", :line=>"491", :method=>"start_pipeline"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"458", :method=>"flush"}
Flushing {:plugin=><LogStash::Filters::Aggregate task_id=>"%{TaskPattern_2}", periodic_flush=>false, map_action=>"create_or_update", end_of_task=>false, push_map_as_event_on_timeout=>false, push_previous_map_as_event=>false>, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"initialize"}
Aggregate flush call with {} {:level=>:debug, :file=>"logstash/filters/aggregate.rb", :line=>"545", :method=>"flush"}
NoMethodError: undefined method `length' for nil:NilClass
     remove_expired_maps at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:592
             synchronize at org/jruby/ext/thread/Mutex.java:149
     remove_expired_maps at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:590
                   flush at /opt/logstash/vendor/local_gems/4b05611b/logstash-filter-aggregate-2.5.1/lib/logstash/filters/aggregate.rb:560
              initialize at (eval):19
                    call at org/jruby/RubyProc.java:281
           flush_filters at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:436
                    each at org/jruby/RubyArray.java:1613
           flush_filters at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:435
  flush_filters_to_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:467
             worker_loop at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:227
           start_workers at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:201

Interestingly, if I join the two logstash conf's in a single file, it does not die. But we have multiple conf's for different data sets and prefer to keep this approach.

Note: to be strange, at times, even the reproduced case never dies from some point, which is very weird (e.g., logstash conf file names matter??).

@fbaligand
Copy link
Collaborator

Waou... very weird indeed...

Just a remark : I guess that in logstash1.conf, you have not the first line input { beats { port => 5044 } } ?

@mrchang7
Copy link
Author

Yes you're right. I just commented out that line. This might be a corner case and if the appropriate data are provided there will be no problem.

@fbaligand
Copy link
Collaborator

Do you mean that when you remove this line, you don't reproduce anymore the issue ?

@mrchang7
Copy link
Author

mrchang7 commented Mar 14, 2017

Hey,

We just found that the merging of the conf files goes in alphabetical order:
https://discuss.elastic.co/t/combining-several-logstash-config-files-into-one-how-do-you-do-it/33546

And I tried changing the conf file names so that the one with data comes first, then the problem was gone. So, it seems like this issue arises when the task_id's without data is positioned ahead of those with data. Data here for the task_id pattern or value.

Note: that way I updated above example.

Also, my tests say that the order matters even with a single file joining all the conf files' content. So, you could just try with this:

input { beats { port => 5044 } }

filter {
    if [type] == "DataType_B" {
        aggregate {
            task_id => "%{TaskPattern_2}"
            code => "map['TaskPattern'] = event['TaskPattern_2']"
        }
    }

    if [type] == "DataType_A" {
        aggregate {
            aggregate_maps_path => "/etc/logstash/conf.d/.aggregate_maps"
            task_id => "%{TaskPattern}"
            code => "map['TaskPattern'] = event['TaskPattern']"
        }
    }
}

output { stdout { codec => rubydebug } }

@fbaligand
Copy link
Collaborator

fbaligand commented Mar 15, 2017

Thanks a lot for all tests and explanation you give !
Now I understand what happens in this issue !

  • First, aggregate filter with 'TaskPattern_2' is initialized. It inits @@aggregate_maps['TaskPattern_2'] to {}
  • But then, when second aggregate filter is initialized, it loads .aggregate_maps file and fully replace @@aggregate_maps content by content in .aggregate_maps file.
  • So the first @@aggregate_maps['TaskPattern_2'] initialisation is lost... and it now equals to nil

In conclusion, to solve this issue, when I load .aggregate_maps file, I have to merge it with existing @@aggregate_maps.
Like that, the issue is solved !

@fbaligand
Copy link
Collaborator

Good news : I managed to reproduce your issue !
It will let me be sure the fix is correct.

@mrchang7
Copy link
Author

Hope the fix goes straightforward without any other extra issues to be handled. Any chance to release the fixed one?

@fbaligand fbaligand added the bug label Mar 25, 2017
fbaligand added a commit to fbaligand/logstash-filter-aggregate that referenced this issue Mar 25, 2017
Re-start of Logstash died when no data were provided in 'aggregate_maps_path' file for some aggregate task_id patterns
fbaligand added a commit that referenced this issue Mar 25, 2017
Re-start of Logstash died when no data were provided in 'aggregate_maps_path' file for some aggregate task_id patterns
@fbaligand
Copy link
Collaborator

Nice news for you @mrchang7 !
The release is done with the fix you expect !
You can download and test aggregate plugin version 2.5.2.
And I would greatly appreciate your feedback !

@mrchang7
Copy link
Author

@fbaligand It WORKS without an issue now. I tested with possible different combinations using our codes and the restart worked as expected.

Appreciate your swift care on this issue. Many thanks!

@fbaligand
Copy link
Collaborator

Great news !
Thanks for your several tests and your feedback !
Given that issue is now fixed, I close the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants