Skip to content

Commit

Permalink
Remove Queue.IsPersistent, not needed (#8875)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 14, 2023
1 parent 6a99f87 commit 6a38ea7
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 15 deletions.
4 changes: 0 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,3 @@ func (q *boundedMemoryQueue) Size() int {
func (q *boundedMemoryQueue) Capacity() int {
return cap(q.items)
}

func (q *boundedMemoryQueue) IsPersistent() bool {
return false
}
4 changes: 0 additions & 4 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ func (pq *persistentQueue) Capacity() int {
return int(pq.capacity)
}

func (pq *persistentQueue) IsPersistent() bool {
return true
}

func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
extension, err := getStorageExtension(host.GetExtensions(), storageID)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,4 @@ type Queue interface {
Shutdown(ctx context.Context) error
// Capacity returns the capacity of the queue.
Capacity() int
// IsPersistent returns true if the queue is persistent.
// TODO: Do not expose this method if the interface moves to a public package.
IsPersistent() bool
}
10 changes: 6 additions & 4 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ type queueSender struct {

func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
if isPersistent {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID,
queueRequestMarshaler(marshaler), queueRequestUnmarshaler(unmarshaler), set)
} else {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
}
return &queueSender{
fullName: set.ID.String(),
Expand All @@ -103,7 +105,7 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: queue.IsPersistent(),
requeuingEnabled: isPersistent,
}
}

Expand Down

0 comments on commit 6a38ea7

Please sign in to comment.