Skip to content

Commit

Permalink
feat: Jetstream ExactlyOnce Configuration
Browse files Browse the repository at this point in the history
This can also be used to opt-in to message deduplication on the broker
when running in nats-core mode.  Fixes edgexfoundry#168

Signed-off-by: Alex Ullrich <alex.ullrich@gmail.com>
  • Loading branch information
AlexCuse committed Oct 15, 2022
1 parent e07deed commit e84efa0
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 51 deletions.
1 change: 1 addition & 0 deletions internal/pkg/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
RetryOnFailedConnect = "RetryOnFailedConnect"
Format = "Format"
QueueGroup = "QueueGroup"
ExactlyOnce = "ExactlyOnce"

// NATS JetStream specifics
Durable = "Durable"
Expand Down
16 changes: 13 additions & 3 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor
return nil, fmt.Errorf("connectionFactory is required")
}

var m interfaces.MarshallerUnmarshaller = &natsMarshaller{} // default
var m interfaces.MarshallerUnmarshaller

cc, err := CreateClientConfiguration(cfg)

Expand All @@ -64,9 +64,9 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor

switch strings.ToLower(cc.Format) {
case "json":
m = &jsonMarshaller{}
m = &jsonMarshaller{opts: cc}
default:
m = &natsMarshaller{}
m = &natsMarshaller{opts: cc}
}

return &Client{config: cc, connect: connectionFactory, m: m}, nil
Expand Down Expand Up @@ -137,6 +137,16 @@ func (c *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error
} else {
tc.Messages <- env
}

if c.config.ExactlyOnce {
// AckSync carries a performance penalty
// but is needed for subscribe side of ExactlyOnce
if ackErr := msg.AckSync(); err != nil {
messageErrors <- ackErr
}
} else {
msg.Ack()
}
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/nats/jetstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewClient(cfg types.MessageBusConfig) (*natsMessaging.Client, error) {
}

func subOpt(cc natsMessaging.ClientConfig) []nats.SubOpt {
return []nats.SubOpt{nats.AckNone(), parseDeliver(cc.Deliver)()}
return []nats.SubOpt{nats.AckExplicit(), parseDeliver(cc.Deliver)()}
}

func pubOpt(cc natsMessaging.ClientConfig) []nats.PubOpt {
Expand Down
74 changes: 74 additions & 0 deletions internal/pkg/nats/jetstream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package jetstream

import (
"context"
"reflect"
"testing"

"github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg"
natsMessaging "github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/nats"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/google/uuid"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -73,6 +76,77 @@ func Test_autoProvision(t *testing.T) {
}
}

func Test_ExactlyOnce(t *testing.T) {
port := 4223
srvr, err := server.NewServer(&server.Options{JetStream: true, StoreDir: t.TempDir(), Port: port})
assert.NoError(t, err)

srvr.Start()
defer srvr.Shutdown()

client, err := NewClient(types.MessageBusConfig{
PublishHost: types.HostInfo{
Host: "localhost",
Port: port,
Protocol: "nats",
},
SubscribeHost: types.HostInfo{
Host: "",
Port: 0,
Protocol: "",
},
Type: "nats-jetstream",
Optional: map[string]string{
pkg.ExactlyOnce: "true",
pkg.AutoProvision: "true",
pkg.Subject: "edgex/#",
},
})

assert.NoError(t, err)

assert.NoError(t, client.Connect())

topic := "edgex/#"
messages := make(chan types.MessageEnvelope)
errors := make(chan error)

err = client.Subscribe([]types.TopicChannel{{
Topic: topic,
Messages: messages,
}}, errors)

m0 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())}
m1 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())}

ctx, done := context.WithCancel(context.Background())

go func() {
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m1, topic))
assert.NoError(t, client.Publish(m1, topic))
assert.NoError(t, client.Publish(m0, topic))
assert.NoError(t, client.Publish(m1, topic))

done()
}()

received := make([]types.MessageEnvelope, 0)

for {
select {
case m := <-messages:
received = append(received, m)
case <-ctx.Done():
assert.Equal(t, 2, len(received))
assert.Equal(t, m0.CorrelationID, received[0].CorrelationID)
assert.Equal(t, m1.CorrelationID, received[1].CorrelationID)
return
}
}
}

func Test_parseDeliver(t *testing.T) {
tests := []struct {
name string
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/nats/jetstream/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (j connection) QueueSubscribe(s string, q string, handler nats.MsgHandler)
// use the configured subject to bind subscription to stream
opts = append(opts, nats.BindStream(subjectToStreamName(natsMessaging.TopicToSubject(j.cfg.Subject))))
}

return j.js.QueueSubscribe(s, q, handler, opts...)
}

Expand Down
18 changes: 16 additions & 2 deletions internal/pkg/nats/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package nats

import (
"encoding/json"
"fmt"
"strconv"
"strings"

Expand All @@ -29,7 +30,9 @@ import (
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
)

type jsonMarshaller struct{}
type jsonMarshaller struct {
opts ClientConfig
}

func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) {
var err error
Expand All @@ -39,6 +42,11 @@ func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string)
out := nats.NewMsg(subject)
out.Data, err = json.Marshal(v)

if jm.opts.ExactlyOnce {
// the broker should only accept a message once per publishing service / correlation ID
out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", jm.opts.ClientId, v.CorrelationID))
}

if err != nil {
return nil, err
}
Expand All @@ -65,7 +73,9 @@ const (
queryParamsHeader = "QueryParams"
)

type natsMarshaller struct{}
type natsMarshaller struct {
opts ClientConfig
}

func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) {
subject := TopicToSubject(publishTopic)
Expand All @@ -83,6 +93,10 @@ func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string)
out.Header.Add(queryParamsHeader, query)
}
}
if nm.opts.ExactlyOnce {
// the broker should only accept a message once per publishing service / correlation ID
out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", nm.opts.ClientId, v.CorrelationID))
}

return out, nil
}
Expand Down
105 changes: 61 additions & 44 deletions internal/pkg/nats/marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,34 @@ var marshallerCases = map[string]interfaces.MarshallerUnmarshaller{
}

func TestJSONMarshaller(t *testing.T) {
sut := &jsonMarshaller{}
pubTopic := uuid.NewString()
suts := []*jsonMarshaller{{}, {ClientConfig{ClientOptions: ClientOptions{ClientId: uuid.NewString(), ExactlyOnce: true}}}}

for _, sut := range suts {
t.Run(fmt.Sprintf("ExactlyOnce_%t", sut.opts.ExactlyOnce), func(t *testing.T) {
pubTopic := uuid.NewString()

orig := sampleMessage(100)
expected := orig
orig := sampleMessage(100)
expected := orig

expected.ReceivedTopic = pubTopic // this is set from NATS message
expected.ReceivedTopic = pubTopic // this is set from NATS message

marshaled, err := sut.Marshal(orig, pubTopic)
marshaled, err := sut.Marshal(orig, pubTopic)

assert.NoError(t, err)
assert.NoError(t, err)

unmarshaled := types.MessageEnvelope{}
if sut.opts.ExactlyOnce {
assert.Equal(t, fmt.Sprintf("%s-%s", sut.opts.ClientId, orig.CorrelationID), marshaled.Header.Get(nats.MsgIdHdr))
}

err = sut.Unmarshal(marshaled, &unmarshaled)
unmarshaled := types.MessageEnvelope{}

assert.NoError(t, err)
err = sut.Unmarshal(marshaled, &unmarshaled)

assert.Equal(t, expected, unmarshaled)
assert.NoError(t, err)

assert.Equal(t, expected, unmarshaled)
})
}
}

func TestJSONMarshaller_Unmarshal_Invalid_JSON(t *testing.T) {
Expand All @@ -73,40 +82,48 @@ func TestJSONMarshaller_Unmarshal_Invalid_JSON(t *testing.T) {
}

func TestNATSMarshaller(t *testing.T) {
sut := &natsMarshaller{}
pubTopic := uuid.NewString()

validWithNoQueryParams := sampleMessage(100)
validWithNoQueryParams.ReceivedTopic = pubTopic
validWithQueryParams := validWithNoQueryParams
validWithQueryParams.QueryParams = map[string]string{"foo": "bar"}

tests := []struct {
name string
envelope types.MessageEnvelope
emptyQueryParams bool
}{
{"valid", validWithQueryParams, false},
{"valid - no query parameters", validWithNoQueryParams, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
marshaled, err := sut.Marshal(tt.envelope, pubTopic)
assert.NoError(t, err)
assert.NotEmpty(t, marshaled.Header.Get(correlationIDHeader))
assert.NotEmpty(t, marshaled.Header.Get(requestIDHeader))
assert.Equal(t, types.ApiVersion, marshaled.Header.Get(apiVersionHeader))
assert.Equal(t, "0", marshaled.Header.Get(errorCodeHeader))
if tt.emptyQueryParams {
assert.Empty(t, marshaled.Header.Get(queryParamsHeader))
} else {
assert.NotEmpty(t, marshaled.Header.Get(queryParamsHeader))
}
suts := []*natsMarshaller{{}, {ClientConfig{ClientOptions: ClientOptions{ClientId: uuid.NewString(), ExactlyOnce: true}}}}

unmarshaled := types.MessageEnvelope{}
err = sut.Unmarshal(marshaled, &unmarshaled)
assert.NoError(t, err)
assert.Equal(t, tt.envelope, unmarshaled)
for _, sut := range suts {
t.Run(fmt.Sprintf("ExactlyOnce_%t", sut.opts.ExactlyOnce), func(t *testing.T) {

pubTopic := uuid.NewString()

validWithNoQueryParams := sampleMessage(100)
validWithNoQueryParams.ReceivedTopic = pubTopic
validWithQueryParams := validWithNoQueryParams
validWithQueryParams.QueryParams = map[string]string{"foo": "bar"}

tests := []struct {
name string
envelope types.MessageEnvelope
emptyQueryParams bool
}{
{"valid", validWithQueryParams, false},
{"valid - no query parameters", validWithNoQueryParams, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
marshaled, err := sut.Marshal(tt.envelope, pubTopic)
assert.NoError(t, err)
assert.NotEmpty(t, marshaled.Header.Get(correlationIDHeader))
assert.NotEmpty(t, marshaled.Header.Get(requestIDHeader))
assert.Equal(t, types.ApiVersion, marshaled.Header.Get(apiVersionHeader))
assert.Equal(t, "0", marshaled.Header.Get(errorCodeHeader))
if tt.emptyQueryParams {
assert.Empty(t, marshaled.Header.Get(queryParamsHeader))
} else {
assert.NotEmpty(t, marshaled.Header.Get(queryParamsHeader))
}
if sut.opts.ExactlyOnce {
assert.Equal(t, fmt.Sprintf("%s-%s", sut.opts.ClientId, tt.envelope.CorrelationID), marshaled.Header.Get(nats.MsgIdHdr))
}
unmarshaled := types.MessageEnvelope{}
err = sut.Unmarshal(marshaled, &unmarshaled)
assert.NoError(t, err)
assert.Equal(t, tt.envelope, unmarshaled)
})
}
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/nats/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type ClientOptions struct {
DefaultPubRetryAttempts int
NKeySeedFile string
CredentialsFile string
ExactlyOnce bool
}

// CreateClientConfiguration constructs a ClientConfig based on the provided MessageBusConfig.
Expand Down Expand Up @@ -161,5 +162,7 @@ func CreateClientOptionsWithDefaults() ClientOptions {
Format: "nats",
NKeySeedFile: "",
CredentialsFile: "",
Deliver: "new",
ExactlyOnce: false, //could use QOS = 2 here
}
}
4 changes: 4 additions & 0 deletions internal/pkg/nats/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestCreateClientConfiguration(t *testing.T) {
pkg.QueueGroup: "group-1",
pkg.DefaultPubRetryAttempts: "12",
pkg.Deliver: "set-deliver",
pkg.ExactlyOnce: "true",
}}},
ClientConfig{
BrokerURL: "tcp://example.com:9090",
Expand All @@ -73,6 +74,7 @@ func TestCreateClientConfiguration(t *testing.T) {
QueueGroup: "group-1",
DefaultPubRetryAttempts: 12,
Deliver: "set-deliver",
ExactlyOnce: true,
},
},
false,
Expand Down Expand Up @@ -100,6 +102,7 @@ func TestCreateClientConfiguration(t *testing.T) {
ConnectTimeout: 7,
DefaultPubRetryAttempts: 2,
Format: "nats",
Deliver: "new",
}},
false,
},
Expand Down Expand Up @@ -155,6 +158,7 @@ func TestCreateClientConfiguration(t *testing.T) {
ConnectTimeout: 7,
DefaultPubRetryAttempts: 2,
Format: "nats",
Deliver: "new",
},
},
false,
Expand Down

0 comments on commit e84efa0

Please sign in to comment.