Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] boundedMemoryQueue may send on closed channel #7388

Closed
atingchen opened this issue Mar 15, 2023 · 4 comments
Closed

[exporterhelper] boundedMemoryQueue may send on closed channel #7388

atingchen opened this issue Mar 15, 2023 · 4 comments
Labels
bug Something isn't working

Comments

@atingchen
Copy link
Contributor

Describe the bug
When the gorutine prepares to submit new item to the queue, q.stopped.Load() returns true. The gorutine has not finished sending items to the channel.

func (q *boundedMemoryQueue) Produce(item Request) bool {
	if q.stopped.Load() {
		return false
	}

	if q.size.Load() >= q.capacity {
		return false
	}

	q.size.Add(1)
	select {
	case q.items <- item:
		return true
	default:
		// should not happen, as overflows should have been captured earlier
		q.size.Add(^uint32(0))
		return false
	}
}

At the same time, service shutdown and invokes stop() to close the channel. The gorutine may send item to the closed channel.

func (q *boundedMemoryQueue) Stop() {
	q.stopped.Store(true) // disable producer
	close(q.items)
	q.stopWG.Wait()
}

What did you expect to see?
thread-safe

What did you see instead?

What version did you use?
v0.73.0

@atingchen atingchen added the bug Something isn't working label Mar 15, 2023
@atingchen atingchen changed the title [exporter] boundedMemoryQueue may send on closed channel [exporterhelper] boundedMemoryQueue may send on closed channel Mar 23, 2023
@yutingcaicyt
Copy link

The collector will stop in topological order so that upstream components are stopped before downstream components. This ensures that each component has a chance to drain to its consumer before the consumer is stopped. Therefore, under normal circumstances, before the exporter is closed, the receiver and processor have guaranteed to complete the data transmission and close. When the exporter is closing, no data will enter the exporter.

@atingchen
Copy link
Contributor Author

The collector will stop in topological order so that upstream components are stopped before downstream components. This ensures that each component has a chance to drain to its consumer before the consumer is stopped. Therefore, under normal circumstances, before the exporter is closed, the receiver and processor have guaranteed to complete the data transmission and close. When the exporter is closing, no data will enter the exporter.

Thanks for your reply @yutingcaicyt. boundedMemoryQueue will be used as a retry queue. When the upstream component is closed, data may still re-enter the queue due to the retry strategy.

When queuedRetrySender calls shutdown, it will first close retryStopCh, and then close the retry queue.
OnTemporaryFailure is called between these two steps, which has a small chance of putting the data back into the retry queue.

		select {
		case <-req.Context().Done():
			return fmt.Errorf("Request is cancelled or timed out %w", err)
		case <-rs.stopCh:
			return rs.onTemporaryFailure(rs.logger, req, fmt.Errorf("interrupted due to shutdown %w", err))
		case <-time.After(backoffDelay):
		}

@dmitryax
Copy link
Member

dmitryax commented Dec 8, 2023

The re-enqueuing is only applicable to the persistent queue, which doesn't have a closed channel. Memory queue doesn't get data re-enqueued

bogdandrutu pushed a commit that referenced this issue Dec 12, 2023
This change unblocks adding the `enqueue_on_failure` option to the queue
sender by removing the requeue behavior on the shutdown. If we don't
remove requeue on shutdown, it's possible to run into a situation
described in
#7388.
After the recent refactoring, the chance of running into it is pretty
small, but it's still possible.

The only reason to requeue on shutdown is to make sure there is no data
loss with the persistent queue enabled. The persistent queue captures
all the inflight requests in the persistent storage anyway, so there is
no reason to requeue an inflight request. The only downside is it
potentially can cause sending duplicate data on the collector restart in
case of a partially failed request during shutdown.

Another option would be to rework the memory queue to never close the
channel but still ensure draining.
sokoide pushed a commit to sokoide/opentelemetry-collector that referenced this issue Dec 18, 2023
)

This change unblocks adding the `enqueue_on_failure` option to the queue
sender by removing the requeue behavior on the shutdown. If we don't
remove requeue on shutdown, it's possible to run into a situation
described in
open-telemetry#7388.
After the recent refactoring, the chance of running into it is pretty
small, but it's still possible.

The only reason to requeue on shutdown is to make sure there is no data
loss with the persistent queue enabled. The persistent queue captures
all the inflight requests in the persistent storage anyway, so there is
no reason to requeue an inflight request. The only downside is it
potentially can cause sending duplicate data on the collector restart in
case of a partially failed request during shutdown.

Another option would be to rework the memory queue to never close the
channel but still ensure draining.
@atoulme
Copy link
Contributor

atoulme commented Dec 19, 2023

I believe this issue to be fixed as we have changed the implementation of queues quite a bit, as @dmitryax points above. The code is completely different now. Previously, a test was flaky because of this problem, but it no longer is.

Closing as resolved. Feel free to comment or create new issues to follow up.

@atoulme atoulme closed this as completed Dec 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants