diff --git a/jetstream/pull.go b/jetstream/pull.go index a97d75e7a..d0a7138ac 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -507,7 +507,7 @@ func (s *pullSubscription) Next() (Msg, error) { if closed && !drainMode { return nil, ErrMsgIteratorClosed } - hbMonitor := s.scheduleHeartbeatCheck(2 * s.consumeOpts.Heartbeat) + hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat) defer func() { if hbMonitor != nil { hbMonitor.Stop() diff --git a/jetstream/test/pull_test.go b/jetstream/test/pull_test.go index dcca6d1c5..657e4db06 100644 --- a/jetstream/test/pull_test.go +++ b/jetstream/test/pull_test.go @@ -1790,6 +1790,52 @@ func TestPullConsumerMessages(t *testing.T) { } }) + t.Run("with idle heartbeat", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // remove consumer to force missing heartbeats + if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil { + t.Fatalf("Error deleting consumer: %s", err) + } + + it, err := c.Messages(jetstream.PullHeartbeat(500 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer it.Stop() + now := time.Now() + _, err = it.Next() + elapsed := time.Since(now) + if !errors.Is(err, jetstream.ErrNoHeartbeat) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err) + } + // we should get missing heartbeat error after approximately 2*heartbeat interval + if elapsed < time.Second || elapsed > 1500*time.Millisecond { + t.Fatalf("Unexpected elapsed time; want 1-1.5s; got %v", elapsed) + } + }) + t.Run("no messages received after stop", func(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) @@ -2488,7 +2534,7 @@ func TestPullConsumerConsume(t *testing.T) { } }) - t.Run("with idle heartbeat", func(t *testing.T) { + t.Run("with missing heartbeat", func(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) nc, err := nats.Connect(srv.ClientURL()) @@ -2513,27 +2559,35 @@ func TestPullConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - msgs := make([]jetstream.Msg, 0) - wg := &sync.WaitGroup{} - wg.Add(len(testMsgs)) - l, err := c.Consume(func(msg jetstream.Msg) { - msgs = append(msgs, msg) - wg.Done() - }, jetstream.PullMaxBytes(1*time.Second)) + // delete consumer to force missing heartbeat error + if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil { + t.Fatalf("Error deleting consumer: %s", err) + } + + errs := make(chan error, 1) + now := time.Now() + var elapsed time.Duration + l, err := c.Consume(func(msg jetstream.Msg) {}, + jetstream.PullHeartbeat(500*time.Millisecond), + jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) { + errs <- err + })) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer l.Stop() - publishTestMsgs(t, js) - wg.Wait() - if len(msgs) != len(testMsgs) { - t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) - } - for i, msg := range msgs { - if string(msg.Data()) != testMsgs[i] { - t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data())) + select { + case err := <-errs: + if !errors.Is(err, jetstream.ErrNoHeartbeat) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err) + } + elapsed = time.Since(now) + if elapsed < time.Second || elapsed > 1500*time.Millisecond { + t.Fatalf("Unexpected elapsed time; want between 1s and 1.5s; got %v", elapsed) } + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for %v", jetstream.ErrNoHeartbeat) } })