Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if cache of add_kubernetes_metadata processors is already closed before closing #32150

Conversation

MichaelKatsoulis
Copy link
Contributor

@MichaelKatsoulis MichaelKatsoulis commented Jun 29, 2022

What does this PR do?

This PR checks if the add_kubernetes_metadata processors cache is already closed before closing.

Why is it important?

In case the cache channel is already closed then the code panics leading to filebeat restarting

{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"input.filestream","log.origin":{"file.name":"filestream/input.go","file.line":143},"message":"Closing reader of filestream","service.name":"filebeat","id":"8DDE18B49C8C362C","source":"filestream::.global::native::10637969-66305","path":"/var/log/containers/filebeat-prod-kxqkf_kube-system_filebeat-9f7ebdc5dbef23f6785804c9b07d0cdf301382e3cf5e7bd1c8e1c82a94250012.log","state-id":"native::10637969-66305","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":158},"message":"client: closing acker","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":163},"message":"client: done closing acker","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":165},"message":"client: unlink from queue","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":187},"message":"client: cancelled 0 events","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":167},"message":"client: done unlink","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":170},"message":"client: closing processors","service.name":"filebeat","ecs.version":"1.6.0"}
panic: close of closed channel

goroutine 214 [running]:
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*cache).stop(...)
  /go/src/github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata/cache.go:97
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*kubernetesAnnotator).Close(0xc000a72700)
  /go/src/github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata/kubernetes.go:311 +0x4f
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x1)
  /go/src/github.com/elastic/beats/libbeat/publisher/processing/processors.go:94 +0x14b
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x0)
  /go/src/github.com/elastic/beats/libbeat/publisher/processing/processors.go:94 +0x14b
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close.func1()
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:171 +0x2d9
sync.(*Once).doSlow(0x0, 0xc00142dd78)
  /usr/local/go/src/sync/once.go:68 +0xd2
sync.(*Once).Do(...)
  /usr/local/go/src/sync/once.go:59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close(0xc00186c000)
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:152 +0x59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).runSignalPropagation(0xc001392fb8)
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/pipeline.go:394 +0x22b
created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).registerSignalPropagation.func1
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/pipeline.go:347 +0x9b

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

@MichaelKatsoulis MichaelKatsoulis requested a review from a team as a code owner June 29, 2022 11:06
@MichaelKatsoulis MichaelKatsoulis requested review from rdner and leehinman and removed request for a team June 29, 2022 11:06
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jun 29, 2022
@elasticmachine
Copy link
Collaborator

elasticmachine commented Jun 29, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-06-29T14:35:18.592+0000

  • Duration: 81 min 30 sec

Test stats 🧪

Test Results
Failed 0
Passed 22404
Skipped 1937
Total 24341

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Member

@rdner rdner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test to make sure there is no regression in the future and I think the bug fix can be achieved easier.

@@ -94,5 +94,18 @@ func (c *cache) cleanup() {
}

func (c *cache) stop() {
close(c.done)
if !isClosed(c.done) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would not it be enough to write:

_, open := <-c.done
if open {
  close(c.done)
}

Copy link
Member

@rdner rdner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a closer look I think either fix would not work, because the c.done channel will remain blocked forever (since nothing is writing into it).

In the main branch I can't see anything sending to c.done and it only gets closed here which should not be a problem if the channel is properly initialised via newCache

func newCache(cleanupTimeout time.Duration) *cache {
c := &cache{
timeout: cleanupTimeout,
deleted: make(map[string]time.Time),
metadata: make(map[string]mapstr.M),
done: make(chan struct{}),
}
go c.cleanup()
return c
}

Is it possible that something is using type cache without its initialiser?

@rdner
Copy link
Member

rdner commented Jun 29, 2022

My bet is on this function called twice for some reason:

func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
}
if k.cache != nil {
k.cache.stop()
}
return nil
}

First it closes the cache properly and the second time it's trying to close a closed channel. Simply modifying it to this should do the trick:

func (k *kubernetesAnnotator) Close() error {
	if k.watcher != nil {
		k.watcher.Stop()
	}
	if k.cache != nil {
		k.cache.stop()
		k.cache = nil
	}
	return nil
}

@MichaelKatsoulis
Copy link
Contributor Author

MichaelKatsoulis commented Jun 29, 2022

My bet is on this function called twice for some reason:

@rdner thanks a lot for taking such a close look. I was also thinking the same. That for some reason the function is called twice by different thread probably.
I just though that my fix would work as I tested it.
But setting the cache to nil whenever cache stop is called will do the trick as well.

@MichaelKatsoulis
Copy link
Contributor Author

MichaelKatsoulis commented Jun 29, 2022

After taking a closer look I think either fix would not work, because the c.done channel will remain blocked forever (since nothing is writing into it).

That is why I am using a select with a default clause which ensures the select never blocks, so either the channel is closed or not

@rdner
Copy link
Member

rdner commented Jun 29, 2022

@MichaelKatsoulis

That is why I am using a select with a default clause which ensures the select never blocks, so either the channel is closed or not

You're right but it only works because nothing is writing to c.done (which is the case here). This channel only ever gets closed which I have not seen initially.

func isClosed(ch <-chan struct{}) bool {
 	select {
 	case <-ch:
 		// reading from a closed channel returns 
 		// a default value of the channel type – this is expected
 		// BUT if the channel was not closed and something wrote into it
 		// we would end up here too.
 		return true 
 	default:
 	}

 	return false
 }

I still think we should just prevent stop from being called twice :)

// isClosed checks if a given chan is already closed
func isClosed(ch <-chan struct{}) bool {
select {
case <-ch:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelKatsoulis and @rdner I agree that we should make sure to never try to close a channel twice.
I am not a huge fan of this isClosed function, even if it seems to work, it is not trivial to understand.
Just to clarify, since done channel is private, nothing writes into it except for stop(c.done), and only used for stopping the cleanup function... I would suggest to

func (c *cache) cleanup() {
	if timeout <= 0 {
		return
	}

	ticker := time.NewTicker(timeout)
	defer ticker.Stop()
	for {
		select {
		case <-c.done:
                         c.done = nil
			return
		case now := <-ticker.C:
			c.Lock()
			for k, t := range c.deleted {
				if now.After(t) {
					delete(c.deleted, k)
					delete(c.metadata, k)
				}
			}
			c.Unlock()
		}
	}
}

func (c *cache) stop() {
        if c.done != nil {
		close(c.done)
        }
}

Copy link
Member

@rdner rdner Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous because if you run cleanup twice it will block forever on the c.done=nil (nil channel). Since we already have a problem of running stop twice I think it's quite possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, but cleanup() is only called once from the newCache constructor while k.cache.stop() is called from a public method Stop() that can be called multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, k.cache = nil is a better solution that isClosed. So I approved the PR

@rdner
Copy link
Member

rdner commented Jun 29, 2022

@MichaelKatsoulis let's make sure it's tested before merging and the panic is not reproducible anymore.

@MichaelKatsoulis
Copy link
Contributor Author

@MichaelKatsoulis let's make sure it's tested before merging and the panic is not reproducible anymore.

Yes of course

@MichaelKatsoulis
Copy link
Contributor Author

/test

@MichaelKatsoulis
Copy link
Contributor Author

MichaelKatsoulis commented Jun 30, 2022

Neither of the solutions work unfortunately. The problem is deeper and is connected to how filestream input handles the processors.

I have noticed that in case filebeat is configured like this (the processor is part of the input):

filebeat.yml: |-
    filebeat.inputs:
    - type: filestream
      id: my-filestream-id
      parsers:
        - container: ~
      prospector.scanner.symlinks: true
      paths:
        - /var/log/containers/*.log
      processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"

then each time a filestream reader closes ( In case of kubernetes jobs that stop after their work is finished, the file remains idle and I guess at some point the reader closes) then it stops the connected processors which means:

  • it stops the watcher for new pods
  • it sends a done signal to the cache channel, which prevents its cleanup

Next time the same happens (filestream reader stops) the same process takes place. But the watcher is already stoped (nothing happens) and the cache done channel is closed (That is the error).

If the first time that the cache is closed we set its value to nil then this will lead to errors because there are still other clients (from different filestream readers) that try to access the cache.

If we check first wether the cache channel is closed or not before closing then we avoid that error. But it is still doesn't make sense as the watcher is stopped and the cache doesn't not get updated. So all the next clients of the cache will get invalid data.

This problem occurs because multiple clients are using the same processor. And each one tries to close it whenever they finish.

That scenario does not occur when in filebeat configuration we set the processor part outside the input like this

filebeat.yml: |-
    filebeat.inputs:
    - type: filestream
      id: my-filestream-id
      parsers:
        - container: ~
      prospector.scanner.symlinks: true
      paths:
        - /var/log/containers/*.log

    processors:
      - add_cloud_metadata:
      - add_host_metadata:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
          - logs_path:
              logs_path: "/var/log/containers/"

In that case the processors only stop once.
So we must either find a way to handle the closing of the processors by multiple clients or don't allow the add_kubernetes_metadata processor to be part of filestream input.

None of this happen with container input by the way.

cc @elastic/obs-cloudnative-monitoring

@MichaelKatsoulis MichaelKatsoulis added Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team labels Jun 30, 2022
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jun 30, 2022
@ChrsMark
Copy link
Member

ChrsMark commented Jun 30, 2022

I guess the way forward here is summarised at So we must either find a way to handle the closing of the processors by multiple clients or don't allow the add_kubernetes_metadata processor to be part of filestream input., right?

2 questions/points here:

  1. Do we have an estimation of what effort the handle the closing of the processors by multiple clients approach requires?
  2. Do we know what would be the impact of don't allow the add_kubernetes_metadata processor to be part of filestream input? If users already have it like this in their configurations then if we disalbe this option their configuration will break, making it a breaking change. With that in mind, maybe investing in "no1" approach is the only option here?

@MichaelKatsoulis
Copy link
Contributor Author

MichaelKatsoulis commented Jun 30, 2022

  1. Do we have an estimation of what effort the handle the closing of the processors by multiple clients approach requires?

The clients are part of a pipeline where clients publish event into. It looks like one client is created per file(not 100% sure).
All these clients share the same processor add_kubernetes_metadata id. When a filestream reader gets closed, Close is called. We need to check if the length of clients list contains clients before closing the processors. Problem is that the client's Close method is called by other places as well like https://github.com/elastic/beats/blob/main/filebeat/beater/channels.go#L145 and this confuses me still.

@rdner as a member of data plane team are you more familiar with these bits of code?

  1. Do we know what would be the impact of don't allow the add_kubernetes_metadata processor to be part of filestream input? If users already have it like this in their configurations then if we disalbe this option their configuration will break, making it a breaking change. With that in mind, maybe investing in "no1" approach is the only option here?

Yes this will be a breaking change. But the update needed is only the filebeat.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants