Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory leak when ES is crashing loopback and fluentd is retring to push buffered log #1051

Open
mchtech opened this issue Oct 9, 2024 · 0 comments

Comments

@mchtech
Copy link

mchtech commented Oct 9, 2024

Problem

After upgrade fluentd from 1.15.2 to 1.17.0, fluentd (es output) encountered memory reclaim issue in complex fault scenarios.
memory is not released when ES status is crashloopback and fluentd is retring to push buffered log.

Steps to replicate

  • env prepare
    • start elastic cluster, nginx proxy for elastic, fluentd with ES output (fluentd ES outputs to nginx ip, nginx reserve-proxy to ES cluster)
  • fault simulation
    • a. config nginx to return 502 and reload nginx. (or stop nginx)
    • b. wait fluentd to generate at least 2 buffer files (buffer.b..., buffer.qaaa, bufer.qbbb...)
    • c. quickly revert step "a" and tail nginx log
    • d. when fluentd begin to retry pushing buffered log (see nginx _bulk http 200 log): redo step "a"
    • e. fluentd begin to output lots of error logs (for each log entry) and the memory of fluentd (v1.17.0) keeps increasing until OOM or container restart (liveness-probe: healthcheck failed), but old-version fluentd works well.

Configs

main nginx(openresty) config
server {
    listen 80;
server_name _;

set $xforwaredfor $remote_addr;
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For $xforwaredfor;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;

proxy_http_version 1.1;
proxy_connect_timeout 5s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;

proxy_buffers 64 8k;
client_max_body_size 100m;

location / {
    access_by_lua_block {
        ---ngx.sleep(math.random(14.5,16));
        ---ngx.exit(502);
    }
proxy_pass http://127.0.0.1:9200;
}

location /_bulk {
    access_by_lua_block {
    ---    ngx.sleep(math.random(14.5,18))
    ---    ngx.exit(502);
    }
    proxy_pass http://127.0.0.1:9200;
}

}

fluentd config
<ROOT>
  <label @FLUENT_LOG>
    <match **>
      @type null
      @id ignore_fluent_logs
    </match>
  </label>
  <source>
    @type gc_stat
    tag "gc.stat"
  </source>
  <match gc.stat>
    @type stdout
  </match>
  <source>
    @type tail
    @id fluentd_test_log
    @label @SOURCE
    path "/path/to/request.log"
    pos_file "/path/to/request.log.pos"
    tag "fluentd_test_log"
    read_from_head true
    follow_inodes true
    emit_unmatched_lines true
    <parse>
      @type "none"
      unmatched_lines true
    </parse>
  </source>
  <label @SOURCE>
    <filter **>
      @type record_transformer
      <record>
        _cluster xx
        _tag ${tag}
        _hostname fluentd-test-xxxx
      </record>
    </filter>
    <filter fluentd_test_log>
      @type grep
      <exclude>
        key "message"
        pattern /api1/
      </exclude>
    </filter>
    <filter fluentd_test_log>
      @type grep
      <exclude>
        key "message"
        pattern /api2/
      </exclude>
    </filter>
    <match **>
      @type relabel
      @label @DISPATCH
    </match>
  </label>
  <label @DISPATCH>
    <filter **>
      @type prometheus
      <metric>
        name fluentd_input_status_num_records_total
        type counter
        desc The total number of incoming records
        <labels>
          tag ${tag}
          hostname ${hostname}
        </labels>
      </metric>
    </filter>
    <match fluentd_test_log>
      @type relabel
      @label @OUTPUT__fluentd_test_log
    </match>
  </label>
  <label @OUTPUT__fluentd_test_log>
    <filter **>
      @type elasticsearch_genid
      hash_id_key "_hash"
    </filter>
    <match **>
      @type copy
      <store>
        @type "elasticsearch"
        hosts "nginx:80"
        path ""
        user "elastic"
        password xxxxxx
        index_name "fluentd-test"
        include_timestamp true
        request_timeout 15s
        bulk_message_request_threshold 20MB
        write_operation "create"
        id_key "_hash"
        remove_keys "_hash"
        <buffer>
          @type "file"
          path "/var/log/fluent/buf/elasticsearch/fluentd_test_log"
          path_suffix ".buf"
          total_limit_size 64GB
          flush_interval 5s
          flush_at_shutdown true
          retry_forever false
          retry_timeout 8h
          retry_max_times 486
          retry_max_interval 60s
        </buffer>
      </store>
    </match>
  </label>
  <source>
    @type prometheus
    @id in_prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>
  <source>
    @type prometheus_monitor
    @id in_prometheus_monitor
  </source>
  <source>
    @type prometheus_output_monitor
    @id in_prometheus_output_monitor
  </source>
</ROOT>
error logs
# flood output (v1.17.0 + v1.15.2)
2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.084289025 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.084289025+08:00"}
2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.085269889 +0800 record={"message"=>"2024-10-09 19:46:42.085119371+08:00", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.085269889+08:00"}
2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.086607166 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.086607166+08:00"}
2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.087835513 +0800 ^C2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.089250369 +0800 record={"message"=>"a long log text", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.089250369+08:00"}
2024-10-09 20:10:41 +0800 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"10.x.x.x\", :port=>80, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\", :path=>\"\"}): Connection refused - connect(2) for 10.x.x.x:80 (Errno::ECONNREFUSED)" location="/fluentd/vendor/bundle/ruby/3.2.0/gems/fluent-plugin-elasticsearch-5.2.5/lib/fluent/plugin/out_elasticsearch.rb:1146:in `rescue in send_bulk'" tag="fluentd_test_log" time=2024-10-09 19:46:42.090581442 +0800 record={"message"=>"2024-10-09 19:46:42.090401205+08:00", "_cluster"=>"yyy", "_tag"=>"fluentd_test_log", "_hostname"=>"fluentd-test-6ddddf5c6-gdxmw", "_node_ip"=>"10.y.y.y", "_pool_name"=>"xxx_fluentd-test-yyy", "@timestamp"=>"2024-10-09T19:46:42.090581442+08:00"}
log generator
while true;do date --rfc-3339=ns >> request.log; cat data.txt >> request.log; done &

Expected Behavior or What you need to ask

Why

Using Fluentd and ES plugin versions

  • OS version: almalinux 9.4
  • Bare Metal or within Docker or Kubernetes or others: k8s 1.26
  • Fluentd version: fluentd 1.17.0 & fluentd 1.15.2
  • ES plugin version:
# 1.15.2
elasticsearch (7.17.1)
elasticsearch-api (7.17.1)
elasticsearch-transport (7.17.1)
elasticsearch-xpack (7.17.1)
fluent-plugin-elasticsearch (5.1.5)
# 1.17.0
elasticsearch (7.17.11)
elasticsearch-api (7.17.11)
elasticsearch-transport (7.17.11)
elasticsearch-xpack (7.17.11)
fluent-plugin-elasticsearch (5.2.5)
  • ES version: 7.17.6
  • Pod resource: requests 256Mi, limits 512Mi

monitor

  • simulation env
    pic1
    pic2
  • prod cluster
    prod
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant