diff --git a/Makefile b/Makefile index 3badbe3a5d..be8ec31531 100644 --- a/Makefile +++ b/Makefile @@ -141,7 +141,7 @@ lint: alloylint # final command runs tests for all other submodules. test: $(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/) - $(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization + $(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization ./internal/component/prometheus/remote/queue/network $(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \; test-packages: diff --git a/docs/sources/reference/components/prometheus/prometheus.remote.queue.md b/docs/sources/reference/components/prometheus/prometheus.remote.queue.md new file mode 100644 index 0000000000..db93f19597 --- /dev/null +++ b/docs/sources/reference/components/prometheus/prometheus.remote.queue.md @@ -0,0 +1,234 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.remote.queue/ +description: Learn about prometheus.remote.queue +title: prometheus.remote.queue +--- + +# prometheus.remote.queue + +`prometheus.remote.queue` collects metrics sent from other components into a +Write-Ahead Log (WAL) and forwards them over the network to a series of +user-supplied endpoints. Metrics are sent over the network using the +[Prometheus Remote Write protocol][remote_write-spec]. + +You can specify multiple `prometheus.remote.queue` components by giving them different labels. + +[remote_write-spec]: https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM/edit + +## Usage + +```alloy +prometheus.remote.queue "LABEL" { + endpoint { + url = REMOTE_WRITE_URL + + ... + } + + ... +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`ttl` | `time` | `duration` | How long the timestamp of a signal is valid for, before the signal is discarded. | `2h` | no +`max_signals_to_batch` | `uint` | The maximum number of signals before they are batched to disk. | `10,000` | no +`batch_frequency` | `duration` | How often to batch signals to disk if `max_signals_to_batch` is not reached. | no + + +## Blocks + +The following blocks are supported inside the definition of +`prometheus.remote.queue`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +endpoint | [endpoint][] | Location to send metrics to. | no +endpoint > basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the endpoint. | no + +The `>` symbol indicates deeper levels of nesting. For example, `endpoint > +basic_auth` refers to a `basic_auth` block defined inside an +`endpoint` block. + +[endpoint]: #endpoint-block +[basic_auth]: #basic_auth-block + +### endpoint block + +The `endpoint` block describes a single location to send metrics to. Multiple +`endpoint` blocks can be provided to send metrics to multiple locations. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`url` | `string` | Full URL to send metrics to. | | yes +`name` | `string` | Optional name to identify the endpoint in metrics. | | no +`write_timeout` | `duration` | Timeout for requests made to the URL. | `"30s"` | no +`retry_backoff` | `duration` | How often to wait between retries. | `1s` | no +`max_retry_backoff_attempts` | Maximum number of retries before dropping the batch. | `1s` | no +`batch_count` | `uint` | How many series to queue in each queue. | `1,000` | no +`flush_frequency` | `duration` | How often to wait until sending if `batch_count` is not trigger. | `1s` | no +`queue_count` | `uint` | How many concurrent batches to write. | 10 | no +`external_labels` | `map(string)` | Labels to add to metrics sent over the network. | | no + +### basic_auth block + +{{< docs/shared lookup="reference/components/basic-auth-block.md" source="alloy" version="" >}} + + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`receiver` | `MetricsReceiver` | A value that other components can use to send metrics to. + +## Component health + +`prometheus.remote.queue` is only reported as unhealthy if given an invalid +configuration. In those cases, exported fields are kept at their last healthy +values. + +## Debug information + +`prometheus.remote_write` does not expose any component-specific debug +information. + +## Debug metrics + +The following metrics are provided for backward compatibility. +They generally behave the same, but there are likely edge cases where they differ. + +* `prometheus_remote_write_wal_storage_created_series_total` (counter): Total number of created + series appended to the WAL. +* `prometheus_remote_write_wal_storage_removed_series_total` (counter): Total number of series + removed from the WAL. +* `prometheus_remote_write_wal_samples_appended_total` (counter): Total number of samples + appended to the WAL. +* `prometheus_remote_write_wal_exemplars_appended_total` (counter): Total number of exemplars + appended to the WAL. +* `prometheus_remote_storage_samples_total` (counter): Total number of samples + sent to remote storage. +* `prometheus_remote_storage_exemplars_total` (counter): Total number of + exemplars sent to remote storage. +* `prometheus_remote_storage_metadata_total` (counter): Total number of + metadata entries sent to remote storage. +* `prometheus_remote_storage_samples_failed_total` (counter): Total number of + samples that failed to send to remote storage due to non-recoverable errors. +* `prometheus_remote_storage_exemplars_failed_total` (counter): Total number of + exemplars that failed to send to remote storage due to non-recoverable errors. +* `prometheus_remote_storage_metadata_failed_total` (counter): Total number of + metadata entries that failed to send to remote storage due to + non-recoverable errors. +* `prometheus_remote_storage_samples_retries_total` (counter): Total number of + samples that failed to send to remote storage but were retried due to + recoverable errors. +* `prometheus_remote_storage_exemplars_retried_total` (counter): Total number of + exemplars that failed to send to remote storage but were retried due to + recoverable errors. +* `prometheus_remote_storage_metadata_retried_total` (counter): Total number of + metadata entries that failed to send to remote storage but were retried due + to recoverable errors. +* `prometheus_remote_storage_samples_dropped_total` (counter): Total number of + samples which were dropped after being read from the WAL before being sent to + remote_write because of an unknown reference ID. +* `prometheus_remote_storage_exemplars_dropped_total` (counter): Total number + of exemplars that were dropped after being read from the WAL before being + sent to remote_write because of an unknown reference ID. +* `prometheus_remote_storage_enqueue_retries_total` (counter): Total number of + times enqueue has failed because a shard's queue was full. +* `prometheus_remote_storage_sent_batch_duration_seconds` (histogram): Duration + of send calls to remote storage. +* `prometheus_remote_storage_queue_highest_sent_timestamp_seconds` (gauge): + Unix timestamp of the latest WAL sample successfully sent by a queue. +* `prometheus_remote_storage_samples_pending` (gauge): The number of samples + pending in shards to be sent to remote storage. +* `prometheus_remote_storage_exemplars_pending` (gauge): The number of + exemplars pending in shards to be sent to remote storage. +* `prometheus_remote_storage_samples_in_total` (counter): Samples read into + remote storage. +* `prometheus_remote_storage_exemplars_in_total` (counter): Exemplars read into + remote storage. + +TODO document new metrics. + +## Examples + +The following examples show you how to create `prometheus.remote_write` components that send metrics to different destinations. + +### Send metrics to a local Mimir instance + +You can create a `prometheus.remote.queue` component that sends your metrics to a local Mimir instance: + +```alloy +prometheus.remote.queue "staging" { + // Send metrics to a locally running Mimir. + endpoint "mimir" { + url = "http://mimir:9009/api/v1/push" + + basic_auth { + username = "example-user" + password = "example-password" + } + } +} + +// Configure a prometheus.scrape component to send metrics to +// prometheus.remote_write component. +prometheus.scrape "demo" { + targets = [ + // Collect metrics from the default HTTP listen address. + {"__address__" = "127.0.0.1:12345"}, + ] + forward_to = [prometheus.remote.queue.staging.receiver] +} + +``` + +## TODO Metadata settings + +## Technical details + +`prometheus.remote.queue` uses [snappy][] for compression. +`prometheus.remote.queue` sends native histograms by default. +Any labels that start with `__` will be removed before sending to the endpoint. + +### Data retention + +Data is written to disk in blocks utilizing [snappy][] compression. These blocks are read on startup and resent if they are still within the TTL. +Any data that has not been written to disk, or that is in the network queues is lost if {{< param "PRODUCT_NAME" >}} is restarted. + +### Retries + +Network errors will be retried. 429 errors will be retried. 5XX errors will retry. Any other non-2XX return codes will not be tried. + +### Memory + +`prometheus.remote.queue` is meant to be memory efficient. By adjusting the `max_signals_to_batch`, `queue_count`, and `batch_size` the amount of memory +can be controlled. A higher `max_signals_to_batch` allows for more efficient disk compression. A higher `queue_count` allows more concurrent writes and `batch_size` +allows more data sent at one time. This can allow greater throughput, at the cost of more memory on both {{< param "PRODUCT_NAME" >}} and the endpoint. The defaults are good for most +common usages. + +## Compatible components + +`prometheus.remote.queue` has exports that can be consumed by the following components: + +- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + + +[snappy]: https://en.wikipedia.org/wiki/Snappy_(compression) +[WAL block]: #wal-block +[Stop]: ../../../../set-up/run/ +[run]: ../../../cli/run/ diff --git a/internal/component/prometheus/remote/queue/component.go b/internal/component/prometheus/remote/queue/component.go new file mode 100644 index 0000000000..e82ca88a23 --- /dev/null +++ b/internal/component/prometheus/remote/queue/component.go @@ -0,0 +1,171 @@ +package queue + +import ( + "context" + "path/filepath" + "reflect" + "sync" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/filequeue" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/network" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/serialization" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/internal/featuregate" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.remote.queue", + Args: Arguments{}, + Exports: Exports{}, + Stability: featuregate.StabilityExperimental, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewComponent(opts, args.(Arguments)) + }, + }) +} + +func NewComponent(opts component.Options, args Arguments) (*Queue, error) { + s := &Queue{ + opts: opts, + args: args, + log: opts.Logger, + endpoints: map[string]*endpoint{}, + } + s.opts.OnStateChange(Exports{Receiver: s}) + err := s.createEndpoints() + if err != nil { + return nil, err + } + return s, nil +} + +// Queue is a queue based WAL used to send data to a remote_write endpoint. Queue supports replaying +// and TTLs. +type Queue struct { + mut sync.RWMutex + args Arguments + opts component.Options + log log.Logger + endpoints map[string]*endpoint +} + +// Run starts the component, blocking until ctx is canceled or the component +// suffers a fatal error. Run is guaranteed to be called exactly once per +// Component. +func (s *Queue) Run(ctx context.Context) error { + for _, ep := range s.endpoints { + ep.Start() + } + defer func() { + s.mut.Lock() + defer s.mut.Unlock() + + for _, ep := range s.endpoints { + ep.Stop() + } + }() + + <-ctx.Done() + return nil +} + +// Update provides a new Config to the component. The type of newConfig will +// always match the struct type which the component registers. +// +// Update will be called concurrently with Run. The component must be able to +// gracefully handle updating its config while still running. +// +// An error may be returned if the provided config is invalid. +func (s *Queue) Update(args component.Arguments) error { + s.mut.Lock() + defer s.mut.Unlock() + + newArgs := args.(Arguments) + sync.OnceFunc(func() { + s.opts.OnStateChange(Exports{Receiver: s}) + }) + // If they are the same do nothing. + if reflect.DeepEqual(newArgs, s.args) { + return nil + } + s.args = newArgs + // TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start.. + if len(s.endpoints) > 0 { + for _, ep := range s.endpoints { + ep.Stop() + } + s.endpoints = map[string]*endpoint{} + } + err := s.createEndpoints() + if err != nil { + return err + } + for _, ep := range s.endpoints { + ep.Start() + } + return nil +} + +func (s *Queue) createEndpoints() error { + for _, ep := range s.args.Connections { + reg := prometheus.WrapRegistererWith(prometheus.Labels{"endpoint": ep.Name}, s.opts.Registerer) + stats := types.NewStats("alloy", "queue_series", reg) + stats.BackwardsCompatibility(reg) + meta := types.NewStats("alloy", "queue_metadata", reg) + cfg := types.ConnectionConfig{ + URL: ep.URL, + BatchCount: ep.BatchCount, + FlushFrequency: ep.FlushFrequency, + Timeout: ep.Timeout, + UserAgent: "alloy", + ExternalLabels: s.args.ExternalLabels, + Connections: ep.QueueCount, + } + if ep.BasicAuth != nil { + cfg.BasicAuth = &types.BasicAuth{ + Username: ep.BasicAuth.Username, + Password: string(ep.BasicAuth.Password), + } + } + client, err := network.New(cfg, s.log, stats.UpdateNetwork, meta.UpdateNetwork) + if err != nil { + return err + } + end := NewEndpoint(client, nil, stats, meta, s.args.TTL, s.opts.Logger) + fq, err := filequeue.NewQueue(filepath.Join(s.opts.DataPath, ep.Name, "wal"), func(ctx context.Context, dh types.DataHandle) { + _ = end.incoming.Send(ctx, dh) + }, s.opts.Logger) + if err != nil { + return err + } + serial, err := serialization.NewSerializer(types.SerializerConfig{ + MaxSignalsInBatch: uint32(s.args.MaxSignalsToBatch), + FlushFrequency: s.args.BatchFrequency, + }, fq, stats.UpdateFileQueue, s.opts.Logger) + if err != nil { + return err + } + end.serializer = serial + s.endpoints[ep.Name] = end + } + return nil +} + +// Appender returns a new appender for the storage. The implementation +// can choose whether or not to use the context, for deadlines or to check +// for errors. +func (c *Queue) Appender(ctx context.Context) storage.Appender { + c.mut.RLock() + defer c.mut.RUnlock() + + children := make([]storage.Appender, 0) + for _, ep := range c.endpoints { + children = append(children, serialization.NewAppender(ctx, c.args.TTL, ep.serializer, c.opts.Logger)) + } + return &fanout{children: children} +} diff --git a/internal/component/prometheus/remote/queue/e2e_bench_test.go b/internal/component/prometheus/remote/queue/e2e_bench_test.go new file mode 100644 index 0000000000..1de030d39c --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_bench_test.go @@ -0,0 +1,125 @@ +package queue + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func BenchmarkE2E(b *testing.B) { + // Around 120k ops if you look at profile roughly 20k are actual implementation with the rest being benchmark + // setup. + type e2eTest struct { + name string + maker func(index int, app storage.Appender) + tester func(samples []prompb.TimeSeries) + } + tests := []e2eTest{ + { + // This should be ~1200 allocs an op + name: "normal", + maker: func(index int, app storage.Appender) { + ts, v, lbls := makeSeries(index) + _, _ = app.Append(0, lbls, ts, v) + }, + tester: func(samples []prompb.TimeSeries) { + b.Helper() + for _, s := range samples { + require.True(b, len(s.Samples) == 1) + } + }, + }, + } + for _, test := range tests { + b.Run(test.name, func(t *testing.B) { + runBenchmark(t, test.maker, test.tester) + }) + } +} + +func runBenchmark(t *testing.B, add func(index int, appendable storage.Appender), _ func(samples []prompb.TimeSeries)) { + t.ReportAllocs() + l := log.NewNopLogger() + done := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + expCh := make(chan Exports, 1) + c, err := newComponentBenchmark(t, l, srv.URL, expCh) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := 0 + app := exp.Receiver.Appender(ctx) + + for i := 0; i < t.N; i++ { + index++ + add(index, app) + } + require.NoError(t, app.Commit()) + + tm := time.NewTimer(10 * time.Second) + select { + case <-done: + case <-tm.C: + } + cancel() +} + +func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Exports) (*Queue, error) { + return NewComponent(component.Options{ + ID: "test", + Logger: l, + DataPath: t.TempDir(), + OnStateChange: func(e component.Exports) { + exp <- e.(Exports) + }, + Registerer: fakeRegistry{}, + Tracer: nil, + }, Arguments{ + TTL: 2 * time.Hour, + MaxSignalsToBatch: 100_000, + BatchFrequency: 1 * time.Second, + Connections: []ConnectionConfig{{ + Name: "test", + URL: url, + Timeout: 10 * time.Second, + RetryBackoff: 1 * time.Second, + MaxRetryBackoffAttempts: 0, + BatchCount: 50, + FlushFrequency: 1 * time.Second, + QueueCount: 1, + }}, + ExternalLabels: nil, + }) +} + +var _ prometheus.Registerer = (*fakeRegistry)(nil) + +type fakeRegistry struct{} + +func (f fakeRegistry) Register(collector prometheus.Collector) error { + return nil +} + +func (f fakeRegistry) MustRegister(collector ...prometheus.Collector) { +} + +func (f fakeRegistry) Unregister(collector prometheus.Collector) bool { + return true +} diff --git a/internal/component/prometheus/remote/queue/e2e_stats_test.go b/internal/component/prometheus/remote/queue/e2e_stats_test.go new file mode 100644 index 0000000000..1fe7601184 --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_stats_test.go @@ -0,0 +1,568 @@ +package queue + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +const remoteSamples = "prometheus_remote_storage_samples_total" +const remoteHistograms = "prometheus_remote_storage_histograms_total" +const remoteMetadata = "prometheus_remote_storage_metadata_total" + +const sentBytes = "prometheus_remote_storage_sent_bytes_total" +const sentMetadataBytes = "prometheus_remote_storage_metadata_bytes_total" + +const outTimestamp = "prometheus_remote_storage_queue_highest_sent_timestamp_seconds" +const inTimestamp = "prometheus_remote_storage_highest_timestamp_in_seconds" + +const failedSample = "prometheus_remote_storage_samples_failed_total" +const failedHistogram = "prometheus_remote_storage_histograms_failed_total" +const failedMetadata = "prometheus_remote_storage_metadata_failed_total" + +const retriedSamples = "prometheus_remote_storage_samples_retried_total" +const retriedHistogram = "prometheus_remote_storage_histograms_retried_total" +const retriedMetadata = "prometheus_remote_storage_metadata_retried_total" + +const prometheusDuration = "prometheus_remote_storage_queue_duration_seconds" + +const filequeueIncoming = "alloy_queue_series_filequeue_incoming" +const alloySent = "alloy_queue_series_network_sent" +const alloyFileQueueIncoming = "alloy_queue_series_filequeue_incoming_timestamp_seconds" +const alloyNetworkDuration = "alloy_queue_series_network_duration_seconds" +const alloyFailures = "alloy_queue_series_network_failed" +const alloyRetries = "alloy_queue_series_network_retried" +const alloy429 = "alloy_queue_series_network_retried_429" + +// TestMetrics is the large end to end testing for the queue based wal. +func TestMetrics(t *testing.T) { + // Check assumes you are checking for any value that is not 0. + // The test at the end will see if there are any values that were not 0. + tests := []statsTest{ + // Sample Tests + { + name: "sample success", + returnStatusCode: http.StatusOK, + dtype: Sample, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: remoteSamples, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "sample failure", + returnStatusCode: http.StatusBadRequest, + dtype: Sample, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: filequeueIncoming, + value: 10, + }, + { + name: failedSample, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "sample retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Sample, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: retriedSamples, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + // histograms + { + name: "histogram success", + returnStatusCode: http.StatusOK, + dtype: Histogram, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: remoteHistograms, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "histogram failure", + returnStatusCode: http.StatusBadRequest, + dtype: Histogram, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: filequeueIncoming, + value: 10, + }, + { + name: failedHistogram, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "histogram retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Histogram, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: retriedHistogram, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + // exemplar, note that once it hits the appender exemplars are treated the same as series. + { + name: "exemplar success", + returnStatusCode: http.StatusOK, + dtype: Exemplar, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: remoteSamples, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "exemplar failure", + returnStatusCode: http.StatusBadRequest, + dtype: Exemplar, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: filequeueIncoming, + value: 10, + }, + { + name: failedSample, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "exemplar retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Exemplar, + checks: []check{ + { + name: filequeueIncoming, + value: 10, + }, + { + name: retriedSamples, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyFileQueueIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runE2eStats(t, test) + }) + } + +} + +func greaterThenZero(v float64) bool { + return v > 0 +} + +func isReasonableTimeStamp(v float64) bool { + if v < 0 { + return false + } + unixTime := time.Unix(int64(v), 0) + + return time.Since(unixTime) < 10*time.Second +} + +type dataType int + +const ( + Sample dataType = iota + Histogram + Exemplar + Metadata +) + +type check struct { + name string + value float64 + valueFunc func(v float64) bool +} +type statsTest struct { + name string + returnStatusCode int + // Only check for non zero values, once all checks are ran it will automatically ensure all remaining metrics are 0. + checks []check + dtype dataType +} + +func runE2eStats(t *testing.T, test statsTest) { + l := util.TestAlloyLogger(t) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(test.returnStatusCode) + })) + expCh := make(chan Exports, 1) + + reg := prometheus.NewRegistry() + c, err := newComponent(t, l, srv.URL, expCh, reg) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := 0 + + go func() { + app := exp.Receiver.Appender(ctx) + for j := 0; j < 10; j++ { + index++ + switch test.dtype { + case Sample: + ts, v, lbls := makeSeries(index) + _, errApp := app.Append(0, lbls, ts, v) + require.NoError(t, errApp) + case Histogram: + ts, lbls, h := makeHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) + require.NoError(t, errApp) + case Exemplar: + ex := makeExemplar(index) + _, errApp := app.AppendExemplar(0, nil, ex) + require.NoError(t, errApp) + default: + require.True(t, false) + } + } + require.NoError(t, app.Commit()) + }() + tm := time.NewTimer(8 * time.Second) + <-tm.C + cancel() + + require.Eventually(t, func() bool { + dtos, gatherErr := reg.Gather() + require.NoError(t, gatherErr) + for _, d := range dtos { + if getValue(d) > 0 { + return true + } + } + return false + }, 10*time.Second, 1*time.Second) + metrics := make(map[string]float64) + dtos, err := reg.Gather() + require.NoError(t, err) + for _, d := range dtos { + metrics[*d.Name] = getValue(d) + } + + // Check for the metrics that matter. + for _, valChk := range test.checks { + if valChk.valueFunc != nil { + metrics = checkValueCondition(t, valChk.name, valChk.valueFunc, metrics) + } else { + metrics = checkValue(t, valChk.name, valChk.value, metrics) + } + } + // all other metrics should be zero. + for k, v := range metrics { + require.Zerof(t, v, "%s should be zero", k) + } +} + +func getValue(d *dto.MetricFamily) float64 { + switch *d.Type { + case dto.MetricType_COUNTER: + return d.Metric[0].Counter.GetValue() + case dto.MetricType_GAUGE: + return d.Metric[0].Gauge.GetValue() + case dto.MetricType_SUMMARY: + return d.Metric[0].Summary.GetSampleSum() + case dto.MetricType_UNTYPED: + return d.Metric[0].Untyped.GetValue() + case dto.MetricType_HISTOGRAM: + return d.Metric[0].Histogram.GetSampleSum() + case dto.MetricType_GAUGE_HISTOGRAM: + return d.Metric[0].Histogram.GetSampleSum() + default: + panic("unknown type " + d.Type.String()) + } +} + +func checkValue(t *testing.T, name string, value float64, metrics map[string]float64) map[string]float64 { + v, ok := metrics[name] + require.Truef(t, ok, "invalid metric name %s", name) + require.Equalf(t, value, v, "%s should be %f", name, value) + delete(metrics, name) + return metrics +} + +func checkValueCondition(t *testing.T, name string, chk func(float64) bool, metrics map[string]float64) map[string]float64 { + v, ok := metrics[name] + require.True(t, ok) + require.True(t, chk(v)) + delete(metrics, name) + return metrics +} diff --git a/internal/component/prometheus/remote/queue/e2e_test.go b/internal/component/prometheus/remote/queue/e2e_test.go new file mode 100644 index 0000000000..58e5597d61 --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_test.go @@ -0,0 +1,363 @@ +package queue + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/snappy" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestE2E(t *testing.T) { + type e2eTest struct { + name string + maker func(index int, app storage.Appender) (float64, labels.Labels) + tester func(samples []prompb.TimeSeries) + testMeta func(samples []prompb.MetricMetadata) + } + tests := []e2eTest{ + { + name: "normal", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, v, lbls := makeSeries(index) + _, errApp := app.Append(0, lbls, ts, v) + require.NoError(t, errApp) + return v, lbls + }, + tester: func(samples []prompb.TimeSeries) { + t.Helper() + for _, s := range samples { + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value > 0) + require.True(t, len(s.Labels) == 1) + require.Truef(t, s.Labels[0].Name == fmt.Sprintf("name_%d", int(s.Samples[0].Value)), "%d name %s", int(s.Samples[0].Value), s.Labels[0].Name) + require.True(t, s.Labels[0].Value == fmt.Sprintf("value_%d", int(s.Samples[0].Value))) + } + }, + }, + { + name: "metadata", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + meta, lbls := makeMetadata(index) + _, errApp := app.UpdateMetadata(0, lbls, meta) + require.NoError(t, errApp) + return 0, lbls + }, + testMeta: func(samples []prompb.MetricMetadata) { + for _, s := range samples { + require.True(t, s.GetUnit() == "seconds") + require.True(t, s.Help == "metadata help") + require.True(t, s.Unit == "seconds") + require.True(t, s.Type == prompb.MetricMetadata_COUNTER) + require.True(t, strings.HasPrefix(s.MetricFamilyName, "name_")) + } + }, + }, + + { + name: "histogram", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, lbls, h := makeHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) + require.NoError(t, errApp) + return h.Sum, lbls + }, + tester: func(samples []prompb.TimeSeries) { + t.Helper() + for _, s := range samples { + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value == 0) + require.True(t, len(s.Labels) == 1) + histSame(t, hist(int(s.Histograms[0].Sum)), s.Histograms[0]) + } + }, + }, + { + name: "float histogram", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, lbls, h := makeFloatHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, nil, h) + require.NoError(t, errApp) + return h.Sum, lbls + }, + tester: func(samples []prompb.TimeSeries) { + t.Helper() + for _, s := range samples { + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value == 0) + require.True(t, len(s.Labels) == 1) + histFloatSame(t, histFloat(int(s.Histograms[0].Sum)), s.Histograms[0]) + } + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runTest(t, test.maker, test.tester, test.testMeta) + }) + } +} + +const ( + iterations = 100 + items = 10_000 +) + +func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples []prompb.TimeSeries), metaTest func(meta []prompb.MetricMetadata)) { + l := util.TestAlloyLogger(t) + done := make(chan struct{}) + var series atomic.Int32 + var meta atomic.Int32 + samples := make([]prompb.TimeSeries, 0) + metaSamples := make([]prompb.MetricMetadata, 0) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + newSamples, newMetadata := handlePost(t, w, r) + series.Add(int32(len(newSamples))) + meta.Add(int32(len(newMetadata))) + samples = append(samples, newSamples...) + metaSamples = append(metaSamples, newMetadata...) + if series.Load() == iterations*items { + done <- struct{}{} + } + if meta.Load() == iterations*items { + done <- struct{}{} + } + })) + expCh := make(chan Exports, 1) + c, err := newComponent(t, l, srv.URL, expCh, prometheus.NewRegistry()) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := 0 + results := make(map[float64]labels.Labels) + mut := sync.Mutex{} + + for i := 0; i < iterations; i++ { + go func() { + app := exp.Receiver.Appender(ctx) + for j := 0; j < items; j++ { + index++ + v, lbl := add(index, app) + mut.Lock() + results[v] = lbl + mut.Unlock() + } + require.NoError(t, app.Commit()) + }() + } + // This is a weird use case to handle eventually. + tm := time.NewTimer(15 * time.Second) + select { + case <-done: + case <-tm.C: + } + cancel() + for _, s := range samples { + if len(s.Histograms) == 1 { + lbls, ok := results[s.Histograms[0].Sum] + require.True(t, ok) + for i, sLbl := range s.Labels { + require.True(t, lbls[i].Name == sLbl.Name) + require.True(t, lbls[i].Value == sLbl.Value) + } + } else { + lbls, ok := results[s.Samples[0].Value] + require.True(t, ok) + for i, sLbl := range s.Labels { + require.True(t, lbls[i].Name == sLbl.Name) + require.True(t, lbls[i].Value == sLbl.Value) + } + } + } + if test != nil { + test(samples) + } else { + metaTest(metaSamples) + } + require.Truef(t, types.OutStandingTimeSeriesBinary.Load() == 0, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load()) +} + +func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) { + defer r.Body.Close() + data, err := io.ReadAll(r.Body) + require.NoError(t, err) + + data, err = snappy.Decode(nil, data) + require.NoError(t, err) + + var req prompb.WriteRequest + err = req.Unmarshal(data) + require.NoError(t, err) + return req.GetTimeseries(), req.Metadata +} + +func makeSeries(index int) (int64, float64, labels.Labels) { + return time.Now().UTC().Unix(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) +} + +func makeMetadata(index int) (metadata.Metadata, labels.Labels) { + return metadata.Metadata{ + Type: "counter", + Unit: "seconds", + Help: "metadata help", + }, labels.FromStrings("__name__", fmt.Sprintf("name_%d", index)) +} + +func makeHistogram(index int) (int64, labels.Labels, *histogram.Histogram) { + return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) +} + +func makeExemplar(index int) exemplar.Exemplar { + return exemplar.Exemplar{ + Labels: labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), + Ts: time.Now().Unix(), + HasTs: true, + Value: float64(index), + } +} + +func hist(i int) *histogram.Histogram { + return &histogram.Histogram{ + CounterResetHint: 1, + Schema: 2, + ZeroThreshold: 3, + ZeroCount: 4, + Count: 5, + Sum: float64(i), + PositiveSpans: []histogram.Span{ + { + Offset: 1, + Length: 2, + }, + }, + NegativeSpans: []histogram.Span{ + { + Offset: 3, + Length: 4, + }, + }, + PositiveBuckets: []int64{1, 2, 3}, + NegativeBuckets: []int64{1, 2, 3}, + } +} + +func histSame(t *testing.T, h *histogram.Histogram, pb prompb.Histogram) { + require.True(t, h.Sum == pb.Sum) + require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountInt).ZeroCountInt) + require.True(t, h.Schema == pb.Schema) + require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountInt).CountInt) + require.True(t, h.ZeroThreshold == pb.ZeroThreshold) + require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) + require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveDeltas)) + require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeDeltas)) + histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) + histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) +} + +func histSpanSame(t *testing.T, h []histogram.Span, pb []prompb.BucketSpan) { + require.True(t, len(h) == len(pb)) + for i := range h { + require.True(t, h[i].Length == pb[i].Length) + require.True(t, h[i].Offset == pb[i].Offset) + } +} + +func makeFloatHistogram(index int) (int64, labels.Labels, *histogram.FloatHistogram) { + return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) +} + +func histFloat(i int) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + CounterResetHint: 1, + Schema: 2, + ZeroThreshold: 3, + ZeroCount: 4, + Count: 5, + Sum: float64(i), + PositiveSpans: []histogram.Span{ + { + Offset: 1, + Length: 2, + }, + }, + NegativeSpans: []histogram.Span{ + { + Offset: 3, + Length: 4, + }, + }, + PositiveBuckets: []float64{1.1, 2.2, 3.3}, + NegativeBuckets: []float64{1.2, 2.3, 3.4}, + } +} + +func histFloatSame(t *testing.T, h *histogram.FloatHistogram, pb prompb.Histogram) { + require.True(t, h.Sum == pb.Sum) + require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountFloat).ZeroCountFloat) + require.True(t, h.Schema == pb.Schema) + require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountFloat).CountFloat) + require.True(t, h.ZeroThreshold == pb.ZeroThreshold) + require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) + require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveCounts)) + require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeCounts)) + histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) + histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) +} + +func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, reg prometheus.Registerer) (*Queue, error) { + return NewComponent(component.Options{ + ID: "test", + Logger: l, + DataPath: t.TempDir(), + OnStateChange: func(e component.Exports) { + exp <- e.(Exports) + }, + Registerer: reg, + Tracer: nil, + }, Arguments{ + TTL: 2 * time.Hour, + MaxSignalsToBatch: 10_000, + BatchFrequency: 1 * time.Second, + Connections: []ConnectionConfig{{ + Name: "test", + URL: url, + Timeout: 20 * time.Second, + RetryBackoff: 5 * time.Second, + MaxRetryBackoffAttempts: 1, + BatchCount: 50, + FlushFrequency: 1 * time.Second, + QueueCount: 1, + }}, + ExternalLabels: nil, + }) +} diff --git a/internal/component/prometheus/remote/queue/endpoint.go b/internal/component/prometheus/remote/queue/endpoint.go new file mode 100644 index 0000000000..3f80528aff --- /dev/null +++ b/internal/component/prometheus/remote/queue/endpoint.go @@ -0,0 +1,137 @@ +package queue + +import ( + "context" + "strconv" + "time" + + snappy "github.com/eapache/go-xerial-snappy" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/vladopajic/go-actor/actor" +) + +var _ actor.Worker = (*endpoint)(nil) + +// endpoint handles communication between the serializer, filequeue and network. +type endpoint struct { + network types.NetworkClient + serializer types.Serializer + stat *types.PrometheusStats + metaStats *types.PrometheusStats + log log.Logger + ttl time.Duration + incoming actor.Mailbox[types.DataHandle] + buf []byte + self actor.Actor +} + +func NewEndpoint(client types.NetworkClient, serializer types.Serializer, stats, metatStats *types.PrometheusStats, ttl time.Duration, logger log.Logger) *endpoint { + return &endpoint{ + network: client, + serializer: serializer, + stat: stats, + metaStats: metatStats, + log: logger, + ttl: ttl, + incoming: actor.NewMailbox[types.DataHandle](actor.OptCapacity(1)), + buf: make([]byte, 0, 1024), + } +} + +func (ep *endpoint) Start() { + ep.self = actor.Combine(actor.New(ep), ep.incoming).Build() + ep.self.Start() + ep.serializer.Start() + ep.network.Start() +} + +func (ep *endpoint) Stop() { + ep.serializer.Stop() + ep.network.Stop() + ep.network.Stop() + ep.self.Stop() +} + +func (ep *endpoint) DoWork(ctx actor.Context) actor.WorkerStatus { + select { + case <-ctx.Done(): + return actor.WorkerEnd + case file, ok := <-ep.incoming.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + meta, buf, err := file.Pop() + if err != nil { + level.Error(ep.log).Log("msg", "unable to get file contents", "name", file.Name, "err", err) + return actor.WorkerContinue + } + ep.deserializeAndSend(ctx, meta, buf) + return actor.WorkerContinue + } +} + +func (ep *endpoint) deserializeAndSend(ctx context.Context, meta map[string]string, buf []byte) { + var err error + ep.buf, err = snappy.DecodeInto(ep.buf, buf) + if err != nil { + level.Debug(ep.log).Log("msg", "error snappy decoding", "err", err) + return + } + // The version of each file is in the metadata. Right now there is only one version + // supported but in the future the ability to support more. Along with different + // compression. + version, ok := meta["version"] + if !ok { + level.Error(ep.log).Log("msg", "version not found for deserialization") + return + } + if version != "alloy.metrics.queue.v1" { + level.Error(ep.log).Log("msg", "invalid version found for deserialization", "version", version) + return + } + // Grab the amounts of each type and we can go ahead and alloc the space. + seriesCount, _ := strconv.Atoi(meta["series_count"]) + metaCount, _ := strconv.Atoi(meta["meta_count"]) + stringsCount, _ := strconv.Atoi(meta["strings_count"]) + sg := &types.SeriesGroup{ + Series: make([]*types.TimeSeriesBinary, seriesCount), + Metadata: make([]*types.TimeSeriesBinary, metaCount), + Strings: make([]string, stringsCount), + } + // Prefill our series with items from the pool to limit allocs. + for i := 0; i < seriesCount; i++ { + sg.Series[i] = types.GetTimeSeriesFromPool() + } + for i := 0; i < metaCount; i++ { + sg.Metadata[i] = types.GetTimeSeriesFromPool() + } + sg, ep.buf, err = types.DeserializeToSeriesGroup(sg, ep.buf) + if err != nil { + level.Debug(ep.log).Log("msg", "error deserializing", "err", err) + return + } + + for _, series := range sg.Series { + // One last chance to check the TTL. Writing to the filequeue will check it but + // in a situation where the network is down and writing backs up we dont want to send + // data that will get rejected. + seriesAge := time.Since(time.Unix(series.TS, 0)) + if seriesAge > ep.ttl { + // TODO @mattdurham add metric here for ttl expired. + continue + } + sendErr := ep.network.SendSeries(ctx, series) + if sendErr != nil { + level.Error(ep.log).Log("msg", "error sending to write client", "err", sendErr) + } + } + + for _, md := range sg.Metadata { + sendErr := ep.network.SendMetadata(ctx, md) + if sendErr != nil { + level.Error(ep.log).Log("msg", "error sending metadata to write client", "err", sendErr) + } + } +} diff --git a/internal/component/prometheus/remote/queue/fanout.go b/internal/component/prometheus/remote/queue/fanout.go new file mode 100644 index 0000000000..09a7fb97ed --- /dev/null +++ b/internal/component/prometheus/remote/queue/fanout.go @@ -0,0 +1,85 @@ +package queue + +import ( + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +var _ storage.Appender = (*fanout)(nil) + +type fanout struct { + children []storage.Appender +} + +func (f fanout) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.Append(ref, l, t, v) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) Commit() error { + for _, child := range f.children { + err := child.Commit() + if err != nil { + return err + } + } + return nil +} + +func (f fanout) Rollback() error { + for _, child := range f.children { + err := child.Rollback() + if err != nil { + return err + } + } + return nil +} + +func (f fanout) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendExemplar(ref, l, e) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendHistogram(ref, l, t, h, fh) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.UpdateMetadata(ref, l, m) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendCTZeroSample(ref, l, t, ct) + if err != nil { + return ref, err + } + } + return ref, nil +} diff --git a/internal/component/prometheus/remote/queue/network/loop.go b/internal/component/prometheus/remote/queue/network/loop.go index cbe94f2e41..72fb17ac39 100644 --- a/internal/component/prometheus/remote/queue/network/loop.go +++ b/internal/component/prometheus/remote/queue/network/loop.go @@ -195,7 +195,9 @@ func (l *loop) send(ctx context.Context, retryCount int) sendResult { httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", l.cfg.UserAgent) httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - httpReq.SetBasicAuth(l.cfg.Username, l.cfg.Password) + if l.cfg.BasicAuth != nil { + httpReq.SetBasicAuth(l.cfg.BasicAuth.Username, l.cfg.BasicAuth.Password) + } if retryCount > 0 { httpReq.Header.Set("Retry-Attempt", strconv.Itoa(retryCount)) diff --git a/internal/component/prometheus/remote/queue/network/manager.go b/internal/component/prometheus/remote/queue/network/manager.go index 19e4f5b961..6281536838 100644 --- a/internal/component/prometheus/remote/queue/network/manager.go +++ b/internal/component/prometheus/remote/queue/network/manager.go @@ -42,7 +42,7 @@ func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStat } // start kicks off a number of concurrent connections. - for i := uint64(0); i < s.cfg.Connections; i++ { + for i := uint(0); i < s.cfg.Connections; i++ { l := newLoop(cc, false, logger, seriesStats) l.self = actor.New(l) s.loops = append(s.loops, l) @@ -120,8 +120,7 @@ func (s *manager) updateConfig(cc types.ConnectionConfig) { level.Debug(s.logger).Log("msg", "dropping all series in loops and creating queue due to config change") s.stopLoops() s.loops = make([]*loop, 0, s.cfg.Connections) - var i uint64 - for ; i < s.cfg.Connections; i++ { + for i := uint(0); i < s.cfg.Connections; i++ { l := newLoop(cc, false, s.logger, s.stats) l.self = actor.New(l) @@ -158,7 +157,7 @@ func (s *manager) startLoops() { // Queue adds anything thats not metadata to the queue. func (s *manager) queue(ctx context.Context, ts *types.TimeSeriesBinary) { // Based on a hash which is the label hash add to the queue. - queueNum := ts.Hash % s.cfg.Connections + queueNum := ts.Hash % uint64(s.cfg.Connections) // This will block if the queue is full. err := s.loops[queueNum].seriesMbx.Send(ctx, ts) if err != nil { diff --git a/internal/component/prometheus/remote/queue/types.go b/internal/component/prometheus/remote/queue/types.go new file mode 100644 index 0000000000..7e424a62d9 --- /dev/null +++ b/internal/component/prometheus/remote/queue/types.go @@ -0,0 +1,116 @@ +package queue + +import ( + "fmt" + "time" + + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/prometheus/prometheus/storage" +) + +func defaultArgs() Arguments { + return Arguments{ + TTL: 2 * time.Hour, + MaxSignalsToBatch: 10_000, + BatchFrequency: 5 * time.Second, + } +} + +type Arguments struct { + // TTL is how old a series can be. + TTL time.Duration `alloy:"ttl,attr,optional"` + // The batch size to persist to the file queue. + MaxSignalsToBatch int `alloy:"max_signals_to_batch,attr,optional"` + // How often to flush to the file queue if BatchSize isn't met. + // TODO @mattdurham this may need to go into a specific block for the serializer. + BatchFrequency time.Duration `alloy:"batch_frequency,attr,optional"` + Connections []ConnectionConfig `alloy:"endpoint,block"` +} + +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +// SetToDefault sets the default +func (rc *Arguments) SetToDefault() { + *rc = defaultArgs() +} + +func defaultCC() ConnectionConfig { + return ConnectionConfig{ + Timeout: 30 * time.Second, + RetryBackoff: 1 * time.Second, + MaxRetryBackoffAttempts: 0, + BatchCount: 1_000, + FlushFrequency: 1 * time.Second, + QueueCount: 4, + } +} + +func (cc *ConnectionConfig) SetToDefault() { + *cc = defaultCC() +} + +func (r *Arguments) Validate() error { + for _, conn := range r.Connections { + if conn.BatchCount <= 0 { + return fmt.Errorf("batch_count must be greater than 0") + } + if conn.FlushFrequency < 1*time.Second { + return fmt.Errorf("flush_frequency must be greater or equal to 1s, the internal timers resolution is 1s") + } + } + + return nil +} + +// ConnectionConfig is the alloy specific version of ConnectionConfig. This looks odd, the idea +// +// is that once this code is tested that the bulk of the underlying code will be used elsewhere. +// this means we need a very generic interface for that code, and a specific alloy implementation here. +type ConnectionConfig struct { + Name string `alloy:",label"` + URL string `alloy:"url,attr"` + BasicAuth *BasicAuth `alloy:"basic_auth,block,optional"` + Timeout time.Duration `alloy:"write_timeout,attr,optional"` + // How long to wait between retries. + RetryBackoff time.Duration `alloy:"retry_backoff,attr,optional"` + // Maximum number of retries. + MaxRetryBackoffAttempts uint `alloy:"max_retry_backoff_attempts,attr,optional"` + // How many series to write at a time. + BatchCount int `alloy:"batch_count,attr,optional"` + // How long to wait before sending regardless of batch count. + FlushFrequency time.Duration `alloy:"flush_frequency,attr,optional"` + // How many concurrent queues to have. + QueueCount uint `alloy:"queue_count,attr,optional"` + + ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` +} + +func (cc ConnectionConfig) ToNativeType() types.ConnectionConfig { + tcc := types.ConnectionConfig{ + URL: cc.URL, + // TODO @mattdurham generate this with build information. + UserAgent: "alloy", + Timeout: cc.Timeout, + RetryBackoff: cc.RetryBackoff, + MaxRetryBackoffAttempts: cc.MaxRetryBackoffAttempts, + BatchCount: cc.BatchCount, + FlushFrequency: cc.FlushFrequency, + ExternalLabels: cc.ExternalLabels, + Connections: cc.QueueCount, + } + if cc.BasicAuth != nil { + tcc.BasicAuth = &types.BasicAuth{ + Username: cc.BasicAuth.Username, + Password: string(cc.BasicAuth.Password), + } + } + return tcc +} + +type BasicAuth struct { + Username string `alloy:"username,attr,optional"` + Password alloytypes.Secret `alloy:"password,attr,optional"` +} diff --git a/internal/component/prometheus/remote/queue/types/network.go b/internal/component/prometheus/remote/queue/types/network.go index 024f8a3122..e83a816f87 100644 --- a/internal/component/prometheus/remote/queue/types/network.go +++ b/internal/component/prometheus/remote/queue/types/network.go @@ -15,16 +15,20 @@ type NetworkClient interface { } type ConnectionConfig struct { URL string - Username string - Password string + BasicAuth *BasicAuth UserAgent string Timeout time.Duration RetryBackoff time.Duration - MaxRetryBackoffAttempts time.Duration + MaxRetryBackoffAttempts uint BatchCount int FlushFrequency time.Duration ExternalLabels map[string]string - Connections uint64 + Connections uint +} + +type BasicAuth struct { + Username string + Password string } func (cc ConnectionConfig) Equals(bb ConnectionConfig) bool { diff --git a/internal/component/prometheus/remote/queue/types/stats.go b/internal/component/prometheus/remote/queue/types/stats.go index 4107d8089f..f68600076e 100644 --- a/internal/component/prometheus/remote/queue/types/stats.go +++ b/internal/component/prometheus/remote/queue/types/stats.go @@ -24,10 +24,10 @@ type PrometheusStats struct { NetworkErrors prometheus.Counter NetworkNewestOutTimeStampSeconds prometheus.Gauge - // Filequeue Stats - FilequeueInSeries prometheus.Counter - FilequeueNewestInTimeStampSeconds prometheus.Gauge - FilequeueErrors prometheus.Counter + // Serializer Stats + SerializerInSeries prometheus.Counter + SerializerNewestInTimeStampSeconds prometheus.Gauge + SerializerErrors prometheus.Counter // Backwards compatibility metrics SamplesTotal prometheus.Counter @@ -55,20 +55,20 @@ type PrometheusStats struct { func NewStats(namespace, subsystem string, registry prometheus.Registerer) *PrometheusStats { s := &PrometheusStats{ - FilequeueInSeries: prometheus.NewCounter(prometheus.CounterOpts{ + SerializerInSeries: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_incoming", + Name: "serializer_incoming_signals", }), - FilequeueNewestInTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + SerializerNewestInTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_incoming_timestamp_seconds", + Name: "serializer_incoming_timestamp_seconds", }), - FilequeueErrors: prometheus.NewGauge(prometheus.GaugeOpts{ + SerializerErrors: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_errors", + Name: "serializer_errors", }), NetworkNewestOutTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -175,9 +175,9 @@ func NewStats(namespace, subsystem string, registry prometheus.Registerer) *Prom s.NetworkSeriesSent, s.NetworkErrors, s.NetworkNewestOutTimeStampSeconds, - s.FilequeueInSeries, - s.FilequeueErrors, - s.FilequeueNewestInTimeStampSeconds, + s.SerializerInSeries, + s.SerializerErrors, + s.SerializerNewestInTimeStampSeconds, ) return s } @@ -231,10 +231,10 @@ func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { } func (s *PrometheusStats) UpdateFileQueue(stats SerializerStats) { - s.FilequeueInSeries.Add(float64(stats.SeriesStored)) - s.FilequeueErrors.Add(float64(stats.Errors)) + s.SerializerInSeries.Add(float64(stats.SeriesStored)) + s.SerializerErrors.Add(float64(stats.Errors)) if stats.NewestTimestamp != 0 { - s.FilequeueNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) + s.SerializerNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) s.RemoteStorageInTimestamp.Set(float64(stats.NewestTimestamp)) } } diff --git a/internal/component/prometheus/remote/queue/types/storage_test.go b/internal/component/prometheus/remote/queue/types/storage_test.go new file mode 100644 index 0000000000..f994427792 --- /dev/null +++ b/internal/component/prometheus/remote/queue/types/storage_test.go @@ -0,0 +1,26 @@ +package types + +import ( + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestStorage(t *testing.T) { + ts := GetTimeSeriesFromPool() + ts.Labels = labels.FromStrings("one", "two") + ts.LabelsValues = make([]uint32, 1) + ts.LabelsNames = make([]uint32, 1) + ts.LabelsValues[0] = 1 + ts.LabelsNames[0] = 2 + + PutTimeSeriesIntoPool(ts) + ts = GetTimeSeriesFromPool() + defer PutTimeSeriesIntoPool(ts) + require.Len(t, ts.Labels, 0) + require.True(t, cap(ts.LabelsValues) == 1) + require.True(t, cap(ts.LabelsNames) == 1) + require.Len(t, ts.LabelsValues, 0) + require.Len(t, ts.LabelsNames, 0) +}