diff --git a/server/consumer.go b/server/consumer.go index e0b1e04c4b0..56dca38b21a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1273,6 +1273,9 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { if cfg.DeliverSubject == _EMPTY_ { return errors.New("can not update pull consumer to push based") } + if ncfg.DeliverSubject == _EMPTY_ { + return errors.New("can not update push consumer to pull based") + } rr := acc.sl.Match(cfg.DeliverSubject) if len(rr.psubs)+len(rr.qsubs) != 0 { return NewJSConsumerNameExistError() @@ -2330,7 +2333,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } - if o.isPushMode() { + if o.isPushMode() || o.waiting == nil { sendErr(409, "Consumer is push based") return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index fe44c833ba1..ba08c9bcbfb 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10360,6 +10360,36 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) { } } +func TestJetStreamConsumerUpgrade(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + testUpdate := func(t *testing.T, s *Server) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{Name: "X"}) + require_NoError(t, err) + _, err = js.Publish("X", []byte("OK")) + require_NoError(t, err) + // First create a consumer that is push based. + _, err = js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc", DeliverSubject: "Y"}) + require_NoError(t, err) + // Now do same name but pull. This should be an error. + _, err = js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc"}) + require_Error(t, err) + } + + t.Run("Single", func(t *testing.T) { testUpdate(t, s) }) + t.Run("Clustered", func(t *testing.T) { testUpdate(t, c.randomServer()) }) +} + // Support functions // Used to setup superclusters for tests.