Skip to content

Commit

Permalink
add ability to use PullConsumers with NATS jetstream with a specified…
Browse files Browse the repository at this point in the history
… callback

Signed-off-by: stephen-totty-hpe <stephen.totty@hpe.com>
  • Loading branch information
stephen-totty-hpe committed Jul 15, 2024
1 parent dc3d9b1 commit 9212db4
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 3 deletions.
16 changes: 16 additions & 0 deletions protocol/nats_jetstream/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
var ErrInvalidDurableName = errors.New("invalid durable name for PullSubscriber")
var ErrInvalidCallbackFunc = errors.New("invalid callback function for PullSubscriber")

// NatsOptions is a helper function to group a variadic nats.ProtocolOption into
// []nats.Option that can be used by either Sender, Consumer or Protocol
Expand Down Expand Up @@ -50,3 +52,17 @@ func WithQueueSubscriber(queue string) ConsumerOption {
return nil
}
}

// WithPullSubscriber configures the Consumer to pull messages when subscribing
func WithPullSubscriber(durable string, fetchCallback FetchCallbackFunc) ConsumerOption {
return func(c *Consumer) error {
if durable == "" {
return ErrInvalidDurableName
}
if fetchCallback == nil {
return ErrInvalidCallbackFunc
}
c.Subscriber = &PullSubscriber{Durable: durable, FetchCallback: fetchCallback}
return nil
}
}
40 changes: 37 additions & 3 deletions protocol/nats_jetstream/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,22 @@ func (c *Consumer) OpenInbound(ctx context.Context) error {
}

// Wait until external or internal context done
select {
case <-ctx.Done():
case <-c.internalClose:
pullSubscriber, isPullSubscriber := c.Subscriber.(*PullSubscriber)
if isPullSubscriber {
err = c.pullSubscriptionProcessor(ctx, sub, pullSubscriber)
if err != nil {
errDrain := sub.Drain()
if errDrain != nil {
return errDrain
}
return err
}
} else {
// Wait until external or internal context done
select {
case <-ctx.Done():
case <-c.internalClose:
}
}

// Finish to consume messages in the queue and close the subscription
Expand Down Expand Up @@ -163,6 +176,27 @@ func (c *Consumer) applyOptions(opts ...ConsumerOption) error {
return nil
}

// pullSubscriptionProcessor allows the pull subscriber to control some aspects of the fetch calls
func (c *Consumer) pullSubscriptionProcessor(ctx context.Context, natsSub *nats.Subscription, ps *PullSubscriber) error {
for {
// Wait until external or internal context done
select {
case <-ctx.Done():
return nil
case <-c.internalClose:
return nil
default:
msgs, err := ps.FetchCallback(natsSub)
if err != nil {
return err
}
for i := range msgs {
c.Receiver.MsgHandler(msgs[i])
}
}
}
}

var _ protocol.Opener = (*Consumer)(nil)
var _ protocol.Receiver = (*Consumer)(nil)
var _ protocol.Closer = (*Consumer)(nil)
16 changes: 16 additions & 0 deletions protocol/nats_jetstream/v2/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,19 @@ func (s *QueueSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, c
}

var _ Subscriber = (*QueueSubscriber)(nil)

// FetchCallbackFunc defines a callback where Fetch should be called against nats.Subscription
type FetchCallbackFunc func(natsSub *nats.Subscription) ([]*nats.Msg, error)

// PullSubscriber creates pull subscriptions
type PullSubscriber struct {
Durable string
FetchCallback FetchCallbackFunc
}

// Subscribe implements Subscriber.Subscribe
func (s *PullSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
return jsm.PullSubscribe(subject, s.Durable, opts...)
}

var _ Subscriber = (*PullSubscriber)(nil)
26 changes: 26 additions & 0 deletions test/integration/nats_jetstream/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) {
bindingEncoding: binding.EncodingStructured,
},
},
{
name: "pull subscriber - structured",
args: args{
opts: []ce_nats.ProtocolOption{
ce_nats.WithConsumerOptions(
ce_nats.WithPullSubscriber(uuid.New().String(), fetchCallback),
),
},
bindingEncoding: binding.EncodingStructured,
},
},
{
name: "regular subscriber - binary",
args: args{
Expand All @@ -70,6 +81,17 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) {
bindingEncoding: binding.EncodingBinary,
},
},
{
name: "pull subscriber - binary",
args: args{
opts: []ce_nats.ProtocolOption{
ce_nats.WithConsumerOptions(
ce_nats.WithPullSubscriber(uuid.New().String(), fetchCallback),
),
},
bindingEncoding: binding.EncodingBinary,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -137,6 +159,10 @@ func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOpt
}, p.Sender, p.Consumer
}

func fetchCallback(natsSub *nats.Subscription) ([]*nats.Msg, error) {
return natsSub.Fetch(1)
}

func BenchmarkSendReceive(b *testing.B) {
conn := testConn(b)
defer conn.Close()
Expand Down

0 comments on commit 9212db4

Please sign in to comment.