Skip to content

Commit

Permalink
Merge pull request #2829 from nats-io/consumer_update
Browse files Browse the repository at this point in the history
Updating a push consumer to be pull should fail.
  • Loading branch information
derekcollison authored Jan 29, 2022
2 parents 26b692e + a57bd96 commit 617e1fb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
5 changes: 4 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,9 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if cfg.DeliverSubject == _EMPTY_ {
return errors.New("can not update pull consumer to push based")
}
if ncfg.DeliverSubject == _EMPTY_ {
return errors.New("can not update push consumer to pull based")
}
rr := acc.sl.Match(cfg.DeliverSubject)
if len(rr.psubs)+len(rr.qsubs) != 0 {
return NewJSConsumerNameExistError()
Expand Down Expand Up @@ -2330,7 +2333,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}

if o.isPushMode() {
if o.isPushMode() || o.waiting == nil {
sendErr(409, "Consumer is push based")
return
}
Expand Down
30 changes: 30 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10360,6 +10360,36 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) {
}
}

func TestJetStreamConsumerUpgrade(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

testUpdate := func(t *testing.T, s *Server) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "X"})
require_NoError(t, err)
_, err = js.Publish("X", []byte("OK"))
require_NoError(t, err)
// First create a consumer that is push based.
_, err = js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc", DeliverSubject: "Y"})
require_NoError(t, err)
// Now do same name but pull. This should be an error.
_, err = js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc"})
require_Error(t, err)
}

t.Run("Single", func(t *testing.T) { testUpdate(t, s) })
t.Run("Clustered", func(t *testing.T) { testUpdate(t, c.randomServer()) })
}

// Support functions

// Used to setup superclusters for tests.
Expand Down

0 comments on commit 617e1fb

Please sign in to comment.