From fce2cfea8e16acfabcb4242e129a9d347ff4fa56 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Tue, 16 Jul 2024 01:51:55 -0700 Subject: [PATCH] [extension/healthcheckv2] Add support for streaming Watch RPC to gRPC service (#34049) **Description:** The PR is the fifth in a series to decompose #30673 into more manageable pieces for review. This PR builds on #34028 and completes the gRPC service by adding support for the streaming Watch RPC. For reference, the gRPC service is an implementation of [grpc_health_v1 service](https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto). **Link to tracking Issue:** #26661 **Testing:** Units / manual **Documentation:** Comments, etc. --- .chloggen/healthcheckv2-grpc-watch.yaml | 27 + .../internal/grpc/grpc.go | 78 +- .../internal/grpc/grpc_test.go | 856 ++++++++++++++++++ .../internal/status/aggregator.go | 2 + 4 files changed, 962 insertions(+), 1 deletion(-) create mode 100644 .chloggen/healthcheckv2-grpc-watch.yaml diff --git a/.chloggen/healthcheckv2-grpc-watch.yaml b/.chloggen/healthcheckv2-grpc-watch.yaml new file mode 100644 index 000000000000..6f75df448d91 --- /dev/null +++ b/.chloggen/healthcheckv2-grpc-watch.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'healthcheckv2extension' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for streaming Watch RPC to healthcheckv2 gRPC service. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26661] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/healthcheckv2extension/internal/grpc/grpc.go b/extension/healthcheckv2extension/internal/grpc/grpc.go index e89a8996c49f..82baaec08416 100644 --- a/extension/healthcheckv2extension/internal/grpc/grpc.go +++ b/extension/healthcheckv2extension/internal/grpc/grpc.go @@ -16,7 +16,10 @@ import ( ) var ( - errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.") + errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.") + errShuttingDown = grpcstatus.Error(codes.Canceled, "Server shutting down.") + errStreamSend = grpcstatus.Error(codes.Canceled, "Error sending; stream terminated.") + errStreamEnded = grpcstatus.Error(codes.Canceled, "Stream has ended.") statusToServingStatusMap = map[component.Status]healthpb.HealthCheckResponse_ServingStatus{ component.StatusNone: healthpb.HealthCheckResponse_NOT_SERVING, @@ -44,6 +47,79 @@ func (s *Server) Check( }, nil } +func (s *Server) Watch(req *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error { + sub, unsub := s.aggregator.Subscribe(status.Scope(req.Service), status.Concise) + defer unsub() + + var lastServingStatus healthpb.HealthCheckResponse_ServingStatus = -1 + var failureTimer *time.Timer + failureCh := make(chan struct{}) + + for { + select { + case st, ok := <-sub: + if !ok { + return errShuttingDown + } + var sst healthpb.HealthCheckResponse_ServingStatus + + switch { + case st == nil: + sst = healthpb.HealthCheckResponse_SERVICE_UNKNOWN + case s.componentHealthConfig.IncludeRecoverable && + s.componentHealthConfig.RecoveryDuration > 0 && + st.Status() == component.StatusRecoverableError: + if failureTimer == nil { + failureTimer = time.AfterFunc( + s.componentHealthConfig.RecoveryDuration, + func() { failureCh <- struct{}{} }, + ) + } + sst = lastServingStatus + if lastServingStatus == -1 { + sst = healthpb.HealthCheckResponse_SERVING + } + default: + if failureTimer != nil { + if !failureTimer.Stop() { + <-failureTimer.C + } + failureTimer = nil + } + sst = s.toServingStatus(st.Event) + } + + if lastServingStatus == sst { + continue + } + + lastServingStatus = sst + + err := stream.Send(&healthpb.HealthCheckResponse{Status: sst}) + if err != nil { + return errStreamSend + } + case <-failureCh: + failureTimer.Stop() + failureTimer = nil + if lastServingStatus == healthpb.HealthCheckResponse_NOT_SERVING { + continue + } + lastServingStatus = healthpb.HealthCheckResponse_NOT_SERVING + err := stream.Send( + &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_NOT_SERVING, + }, + ) + if err != nil { + return errStreamSend + } + case <-stream.Context().Done(): + return errStreamEnded + } + } +} + func (s *Server) toServingStatus( ev status.Event, ) healthpb.HealthCheckResponse_ServingStatus { diff --git a/extension/healthcheckv2extension/internal/grpc/grpc_test.go b/extension/healthcheckv2extension/internal/grpc/grpc_test.go index 866acd446ff0..95f52d4918bf 100644 --- a/extension/healthcheckv2extension/internal/grpc/grpc_test.go +++ b/extension/healthcheckv2extension/internal/grpc/grpc_test.go @@ -5,6 +5,7 @@ package grpc import ( "context" + "sync" "testing" "time" @@ -741,3 +742,858 @@ func TestCheck(t *testing.T) { } } + +func TestWatch(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + config := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + var server *Server + traces := testhelpers.NewPipelineMetadata("traces") + metrics := testhelpers.NewPipelineMetadata("metrics") + + // statusUnchanged is a sentinel value to signal that a step does not result + // in a status change. This is important, because checking for a status + // change is blocking. + var statusUnchanged healthpb.HealthCheckResponse_ServingStatus = -1 + + type teststep struct { + step func() + service string + expectedStatus healthpb.HealthCheckResponse_ServingStatus + } + + tests := []struct { + name string + config *Config + componentHealthSettings *common.ComponentHealthConfig + teststeps []teststep + }{ + { + name: "exclude recoverable and permanent errors", + config: config, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // errors will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + { + step: func() { + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + { + step: func() { + // This will be the last status change for traces (stopping changes to NOT_SERVING) + // Stopped results in the same serving status, and repeat statuses are not streamed. + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // This will be the last status change for metrics (stopping changes to NOT_SERVING) + // Stopped results in the same serving status, and repeat statuses are not streamed. + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "include recoverable and exclude permanent errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: false, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // permanent error will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + }, + }, + { + name: "exclude permanent errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: false, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // permanent error will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + }, + }, + { + name: "include recoverable 0s recovery duration", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: false, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // This will be the last status change for traces (stopping changes to NOT_SERVING) + // Stopped results in the same serving status, and repeat statuses are not streamed. + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // This will be the last status change for metrics (stopping changes to NOT_SERVING) + // Stopped results in the same serving status, and repeat statuses are not streamed. + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStopping, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "include permanent and exclude recoverable errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: true, + IncludeRecoverable: false, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // recoverable will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // This will be the last status change for traces (stopping changes to NOT_SERVING) + // Stopped results in the same serving status, and repeat statuses are not streamed. + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStopping, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + { + name: "exclude recoverable errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: true, + IncludeRecoverable: false, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // recoverable will be ignored + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: statusUnchanged, + }, + }, + }, + { + name: "include recoverable and permanent errors", + config: config, + componentHealthSettings: &common.ComponentHealthConfig{ + IncludePermanent: true, + IncludeRecoverable: true, + RecoveryDuration: 2 * time.Millisecond, + }, + teststeps: []teststep{ + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVICE_UNKNOWN, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusStarting, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusStarting, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + traces.InstanceIDs(), + component.StatusOK, + ) + }, + service: traces.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + testhelpers.SeedAggregator( + server.aggregator, + metrics.InstanceIDs(), + component.StatusOK, + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + step: func() { + // metrics and overall status will recover and resume SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewStatusEvent(component.StatusOK), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_SERVING, + }, + { + step: func() { + // metrics and overall status will be NOT_SERVING + server.aggregator.RecordStatus( + metrics.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + }, + service: metrics.PipelineID.String(), + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + { + service: "", + expectedStatus: healthpb.HealthCheckResponse_NOT_SERVING, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server = NewServer( + config, + tc.componentHealthSettings, + componenttest.NewNopTelemetrySettings(), + status.NewAggregator(testhelpers.ErrPriority(tc.componentHealthSettings)), + ) + require.NoError(t, server.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, server.Shutdown(context.Background())) }) + + cc, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer func() { + assert.NoError(t, cc.Close()) + }() + + client := healthpb.NewHealthClient(cc) + watchers := make(map[string]healthpb.Health_WatchClient) + + for _, ts := range tc.teststeps { + if ts.step != nil { + ts.step() + } + + if statusUnchanged == ts.expectedStatus { + continue + } + + watcher, ok := watchers[ts.service] + if !ok { + watcher, err = client.Watch( + context.Background(), + &healthpb.HealthCheckRequest{Service: ts.service}, + ) + require.NoError(t, err) + watchers[ts.service] = watcher + } + + var resp *healthpb.HealthCheckResponse + // Note Recv blocks until there is a new item in the stream + resp, err = watcher.Recv() + require.NoError(t, err) + assert.Equal(t, ts.expectedStatus, resp.Status) + } + + wg := sync.WaitGroup{} + wg.Add(len(watchers)) + + for svc, watcher := range watchers { + svc := svc + watcher := watcher + go func() { + resp, err := watcher.Recv() + // Ensure there are not any unread messages + assert.Nil(t, resp, "%s: had unread messages", svc) + // Ensure watchers receive the cancelation when streams are closed by the server + assert.Equal(t, grpcstatus.Error(codes.Canceled, "Server shutting down."), err) + wg.Done() + }() + } + + // closing the aggregator will gracefully terminate streams of status events + server.aggregator.Close() + wg.Wait() + }) + } +} diff --git a/extension/healthcheckv2extension/internal/status/aggregator.go b/extension/healthcheckv2extension/internal/status/aggregator.go index 7946eba5c218..50630cc37b76 100644 --- a/extension/healthcheckv2extension/internal/status/aggregator.go +++ b/extension/healthcheckv2extension/internal/status/aggregator.go @@ -193,6 +193,8 @@ func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) (<-chan *Aggreg el := subList.PushBack(sub) unsubFunc := func() { + a.mu.Lock() + defer a.mu.Unlock() subList.Remove(el) if subList.Front() == nil { delete(a.subscriptions, key)