Skip to content

Commit

Permalink
feat: Jetstream ExactlyOnce Configuration
Browse files Browse the repository at this point in the history
This option can be used to force use of the Nats-Msg-Id header for
deduplication by ClientID + CorrelationID combination.  Also forces
synchronous acknowledgement on subscriber side.  Fixes #168

Signed-off-by: Alex Ullrich <alex.ullrich@gmail.com>
  • Loading branch information
AlexCuse committed Jul 12, 2023
1 parent 07c1cec commit 3c55305
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 64 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ require (
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
google.golang.org/protobuf v1.26.0-rc.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
RetryOnFailedConnect = "RetryOnFailedConnect"
Format = "Format"
QueueGroup = "QueueGroup"
ExactlyOnce = "ExactlyOnce"

// NATS JetStream specifics
Durable = "Durable"
Expand Down
21 changes: 18 additions & 3 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor
return nil, fmt.Errorf("connectionFactory is required")
}

var m interfaces.MarshallerUnmarshaller = &natsMarshaller{} // default
var m interfaces.MarshallerUnmarshaller

cc, err := CreateClientConfiguration(cfg)

Expand All @@ -69,9 +69,9 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor

switch strings.ToLower(cc.Format) {
case "json":
m = &jsonMarshaller{}
m = &jsonMarshaller{opts: cc}
default:
m = &natsMarshaller{}
m = &natsMarshaller{opts: cc}
}

return &Client{
Expand Down Expand Up @@ -153,6 +153,21 @@ func (c *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error
} else {
tc.Messages <- env
}

// core nats messages without reply do not need to be ack'd
if msg.Reply != "" {
var ackErr error
if c.config.ExactlyOnce {
// AckSync carries a performance penalty
// but is needed for subscribe side of ExactlyOnce
ackErr = msg.AckSync()
} else {
ackErr = msg.Ack()
}
if ackErr != nil {
messageErrors <- ackErr
}
}
})

if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions internal/pkg/nats/interfaces/mocks/Connection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/pkg/nats/jetstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewClient(cfg types.MessageBusConfig) (*natsMessaging.Client, error) {
}

func subOpt(cc natsMessaging.ClientConfig) []nats.SubOpt {
return []nats.SubOpt{nats.AckNone(), parseDeliver(cc.Deliver)()}
return []nats.SubOpt{nats.AckExplicit(), parseDeliver(cc.Deliver)()}
}

func pubOpt(cc natsMessaging.ClientConfig) []nats.PubOpt {
Expand Down
75 changes: 70 additions & 5 deletions internal/pkg/nats/jetstream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package jetstream

import (
"context"
"reflect"
"testing"
"time"

"github.com/edgexfoundry/go-mod-messaging/v3/internal/pkg"
natsMessaging "github.com/edgexfoundry/go-mod-messaging/v3/internal/pkg/nats"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
"github.com/google/uuid"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -52,9 +56,6 @@ func Test_autoProvision(t *testing.T) {
js, err := client.JetStream()
assert.NoError(t, err)

_, err = js.StreamInfo(tc.expectStream)
assert.Error(t, err)

config := natsMessaging.ClientConfig{
ClientOptions: natsMessaging.ClientOptions{
Subject: tc.streamSubject,
Expand All @@ -73,6 +74,72 @@ func Test_autoProvision(t *testing.T) {
}
}

func Test_ExactlyOnce(t *testing.T) {
port := 4223
srvr, err := server.NewServer(&server.Options{JetStream: true, StoreDir: t.TempDir(), Port: port})
assert.NoError(t, err)

srvr.Start()
defer srvr.Shutdown()

client, err := NewClient(types.MessageBusConfig{
Broker: types.HostInfo{
Host: "localhost",
Port: port,
Protocol: "nats",
},
Type: "nats-jetstream",
Optional: map[string]string{
pkg.ExactlyOnce: "true",
pkg.AutoProvision: "true",
pkg.Subject: "edgex/#",
},
})

assert.NoError(t, err)

assert.NoError(t, client.Connect())

topic := "edgex/#"
messages := make(chan types.MessageEnvelope)
errors := make(chan error)

err = client.Subscribe([]types.TopicChannel{{
Topic: topic,
Messages: messages,
}}, errors)

m0 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())}
m1 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())}

ctx, done := context.WithTimeout(context.Background(), 30*time.Second)

go func() {
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m1, topic))
assert.NoError(t, client.Publish(m1, topic))
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m1, topic))

done()
}()

received := make([]types.MessageEnvelope, 0)

for {
select {
case m := <-messages:
received = append(received, m)
case <-ctx.Done():
assert.Equal(t, 2, len(received))
assert.Equal(t, m0.CorrelationID, received[0].CorrelationID)
assert.Equal(t, m1.CorrelationID, received[1].CorrelationID)
return
}
}
}

func Test_parseDeliver(t *testing.T) {
tests := []struct {
name string
Expand All @@ -88,8 +155,6 @@ func Test_parseDeliver(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
//cc := &nats.ConsumerConfig{}

opt := parseDeliver(tt.input)

wantAddr := reflect.ValueOf(tt.want).Pointer()
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/nats/jetstream/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (j connection) QueueSubscribe(s string, q string, handler nats.MsgHandler)
// use the configured subject to bind subscription to stream
opts = append(opts, nats.BindStream(subjectToStreamName(natsMessaging.TopicToSubject(j.cfg.Subject))))
}

return j.js.QueueSubscribe(s, q, handler, opts...)
}

Expand Down
18 changes: 16 additions & 2 deletions internal/pkg/nats/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package nats

import (
"encoding/json"
"fmt"
"strconv"
"strings"

Expand All @@ -29,7 +30,9 @@ import (
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
)

type jsonMarshaller struct{}
type jsonMarshaller struct {
opts ClientConfig
}

func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) {
var err error
Expand All @@ -39,6 +42,11 @@ func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string)
out := nats.NewMsg(subject)
out.Data, err = json.Marshal(v)

if jm.opts.ExactlyOnce {
// the broker should only accept a message once per publishing service / correlation ID
out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", jm.opts.ClientId, v.CorrelationID))
}

if err != nil {
return nil, err
}
Expand All @@ -65,7 +73,9 @@ const (
queryParamsHeader = "QueryParams"
)

type natsMarshaller struct{}
type natsMarshaller struct {
opts ClientConfig
}

func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) {
subject := TopicToSubject(publishTopic)
Expand All @@ -83,6 +93,10 @@ func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string)
out.Header.Add(queryParamsHeader, query)
}
}
if nm.opts.ExactlyOnce {
// the broker should only accept a message once per publishing service / correlation ID
out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", nm.opts.ClientId, v.CorrelationID))
}

return out, nil
}
Expand Down
Loading

0 comments on commit 3c55305

Please sign in to comment.