Skip to content

Commit

Permalink
[IMPROVED] Ordered consumer creation and initial config settings (#1645)
Browse files Browse the repository at this point in the history
This changes a few things around creating ordered consumers:
- initial ordered consumer creation is now done without retries
- fixed an issue where start seq could be invalid when resetting a consumer which did not receive any messages
- simplified getConsumerConfig()

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Jun 13, 2024
1 parent 1deccaf commit 005a6f2
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 13 deletions.
7 changes: 3 additions & 4 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,12 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
namePrefix: nuid.Next(),
doReset: make(chan struct{}, 1),
}
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq
}
err := oc.reset()
consCfg := oc.getConsumerConfig()
cons, err := js.CreateOrUpdateConsumer(ctx, stream, *consCfg)
if err != nil {
return nil, err
}
oc.currentConsumer = cons.(*pullConsumer)

return oc, nil
}
Expand Down
23 changes: 18 additions & 5 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ func (c *orderedConsumer) reset() error {
}
}

seq := c.cursor.streamSeq + 1
c.cursor.deliverSeq = 0
consumerConfig := c.getConsumerConfigForSeq(seq)
consumerConfig := c.getConsumerConfig()

var err error
var cons Consumer
Expand Down Expand Up @@ -519,13 +518,27 @@ func (c *orderedConsumer) reset() error {
return nil
}

func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
c.serial++
var nextSeq uint64

// if stream sequence is not initialized, no message was consumed yet
// therefore, start from the beginning (either from 1 or from the provided sequence)
if c.cursor.streamSeq == 0 {
if c.cfg.OptStartSeq != 0 {
nextSeq = c.cfg.OptStartSeq
} else {
nextSeq = 1
}
} else {
// otherwise, start from the next sequence
nextSeq = c.cursor.streamSeq + 1
}
name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial)
cfg := &ConsumerConfig{
Name: name,
DeliverPolicy: DeliverByStartSequencePolicy,
OptStartSeq: seq,
OptStartSeq: nextSeq,
AckPolicy: AckNonePolicy,
InactiveThreshold: 5 * time.Minute,
Replicas: 1,
Expand All @@ -538,7 +551,7 @@ func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
cfg.FilterSubjects = c.cfg.FilterSubjects
}

if seq != c.cfg.OptStartSeq+1 {
if c.serial != 1 {
return cfg
}

Expand Down
7 changes: 3 additions & 4 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,12 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig)
namePrefix: nuid.Next(),
doReset: make(chan struct{}, 1),
}
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq
}
err := oc.reset()
consCfg := oc.getConsumerConfig()
cons, err := s.CreateOrUpdateConsumer(ctx, *consCfg)
if err != nil {
return nil, err
}
oc.currentConsumer = cons.(*pullConsumer)

return oc, nil
}
Expand Down
99 changes: 99 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,105 @@ func TestOrderedConsumerConsume(t *testing.T) {
l.Stop()
})

t.Run("reset consumer before receiving any messages", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

wg := &sync.WaitGroup{}
l, err := c.Consume(func(msg jetstream.Msg) {
wg.Done()
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(500 * time.Millisecond)

name := c.CachedInfo().Name
if err := s.DeleteConsumer(ctx, name); err != nil {
t.Fatal(err)
}
wg.Add(len(testMsgs))
publishTestMsgs(t, nc)
wg.Wait()

l.Stop()
})

t.Run("reset consumer before receiving any messages with custom start seq", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{DeliverPolicy: jetstream.DeliverByStartSequencePolicy, OptStartSeq: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

wg := &sync.WaitGroup{}
l, err := c.Consume(func(msg jetstream.Msg) {
wg.Done()
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(500 * time.Millisecond)

name := c.CachedInfo().Name
if err := s.DeleteConsumer(ctx, name); err != nil {
t.Fatal(err)
}
// should receive messages with sequences 3, 4 and 5
wg.Add(len(testMsgs) - 2)
publishTestMsgs(t, nc)
wg.Wait()

// now delete consumer again and publish some more messages, all should be received normally
name = c.CachedInfo().Name
if err := s.DeleteConsumer(ctx, name); err != nil {
t.Fatal(err)
}
wg.Add(len(testMsgs))
publishTestMsgs(t, nc)
wg.Wait()
l.Stop()
})

t.Run("base usage, server shutdown", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
Expand Down

0 comments on commit 005a6f2

Please sign in to comment.