diff --git a/pkg/stanza/operator/input/windows/input.go b/pkg/stanza/operator/input/windows/input.go index f7c64cc62107..51f82a9485d0 100644 --- a/pkg/stanza/operator/input/windows/input.go +++ b/pkg/stanza/operator/input/windows/input.go @@ -211,12 +211,14 @@ func (i *Input) readToEnd(ctx context.Context) { } } +var errHandleInvalid = errors.New("The handle is invalid.") + // read will read events from the subscription. func (i *Input) read(ctx context.Context) int { events, err := i.subscription.Read(i.maxReads) if err != nil { i.Logger().Error("Failed to read events from subscription", zap.Error(err)) - if i.isRemote() { + if i.isRemote() && (err == errHandleInvalid || err == errSubscriptionNotOpen) { i.Logger().Info("Resubscribing, closing subscription") closeErr := i.subscription.Close() if closeErr != nil { diff --git a/pkg/stanza/operator/input/windows/subscription.go b/pkg/stanza/operator/input/windows/subscription.go index d2aba4555460..ba8527deaaef 100644 --- a/pkg/stanza/operator/input/windows/subscription.go +++ b/pkg/stanza/operator/input/windows/subscription.go @@ -64,10 +64,12 @@ func (s *Subscription) Close() error { return nil } +var errSubscriptionNotOpen = errors.New("subscription handle is not open") + // Read will read events from the subscription. func (s *Subscription) Read(maxReads int) ([]Event, error) { if s.handle == 0 { - return nil, fmt.Errorf("subscription handle is not open") + return nil, errSubscriptionNotOpen } if maxReads < 1 {