Skip to content

Commit

Permalink
[extension/healthcheckv2] Add support for streaming Watch RPC to gRPC…
Browse files Browse the repository at this point in the history
… service (#34049)

**Description:** <Describe what has changed.>
The PR is the fifth in a series to decompose #30673 into more manageable
pieces for review. This PR builds on #34028 and completes the gRPC
service by adding support for the streaming Watch RPC. For reference,
the
gRPC service is an implementation of [grpc_health_v1
service](https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto).

**Link to tracking Issue:** #26661

**Testing:** Units / manual

**Documentation:** Comments, etc.
  • Loading branch information
mwear authored Jul 16, 2024
1 parent 12d071d commit fce2cfe
Show file tree
Hide file tree
Showing 4 changed files with 962 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/healthcheckv2-grpc-watch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'healthcheckv2extension'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for streaming Watch RPC to healthcheckv2 gRPC service.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
78 changes: 77 additions & 1 deletion extension/healthcheckv2extension/internal/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
)

var (
errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.")
errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.")
errShuttingDown = grpcstatus.Error(codes.Canceled, "Server shutting down.")
errStreamSend = grpcstatus.Error(codes.Canceled, "Error sending; stream terminated.")
errStreamEnded = grpcstatus.Error(codes.Canceled, "Stream has ended.")

statusToServingStatusMap = map[component.Status]healthpb.HealthCheckResponse_ServingStatus{
component.StatusNone: healthpb.HealthCheckResponse_NOT_SERVING,
Expand Down Expand Up @@ -44,6 +47,79 @@ func (s *Server) Check(
}, nil
}

func (s *Server) Watch(req *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error {
sub, unsub := s.aggregator.Subscribe(status.Scope(req.Service), status.Concise)
defer unsub()

var lastServingStatus healthpb.HealthCheckResponse_ServingStatus = -1
var failureTimer *time.Timer
failureCh := make(chan struct{})

for {
select {
case st, ok := <-sub:
if !ok {
return errShuttingDown
}
var sst healthpb.HealthCheckResponse_ServingStatus

switch {
case st == nil:
sst = healthpb.HealthCheckResponse_SERVICE_UNKNOWN
case s.componentHealthConfig.IncludeRecoverable &&
s.componentHealthConfig.RecoveryDuration > 0 &&
st.Status() == component.StatusRecoverableError:
if failureTimer == nil {
failureTimer = time.AfterFunc(
s.componentHealthConfig.RecoveryDuration,
func() { failureCh <- struct{}{} },
)
}
sst = lastServingStatus
if lastServingStatus == -1 {
sst = healthpb.HealthCheckResponse_SERVING
}
default:
if failureTimer != nil {
if !failureTimer.Stop() {
<-failureTimer.C
}
failureTimer = nil
}
sst = s.toServingStatus(st.Event)
}

if lastServingStatus == sst {
continue
}

lastServingStatus = sst

err := stream.Send(&healthpb.HealthCheckResponse{Status: sst})
if err != nil {
return errStreamSend
}
case <-failureCh:
failureTimer.Stop()
failureTimer = nil
if lastServingStatus == healthpb.HealthCheckResponse_NOT_SERVING {
continue
}
lastServingStatus = healthpb.HealthCheckResponse_NOT_SERVING
err := stream.Send(
&healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
},
)
if err != nil {
return errStreamSend
}
case <-stream.Context().Done():
return errStreamEnded
}
}
}

func (s *Server) toServingStatus(
ev status.Event,
) healthpb.HealthCheckResponse_ServingStatus {
Expand Down
Loading

0 comments on commit fce2cfe

Please sign in to comment.