From e84efa0a1dd3ed9c68f9b295a574b7251babb9db Mon Sep 17 00:00:00 2001 From: Alex Ullrich Date: Fri, 14 Oct 2022 09:30:39 -0400 Subject: [PATCH] feat: Jetstream ExactlyOnce Configuration This can also be used to opt-in to message deduplication on the broker when running in nats-core mode. Fixes #168 Signed-off-by: Alex Ullrich --- internal/pkg/constants.go | 1 + internal/pkg/nats/client.go | 16 +++- internal/pkg/nats/jetstream/client.go | 2 +- internal/pkg/nats/jetstream/client_test.go | 74 +++++++++++++++ internal/pkg/nats/jetstream/connection.go | 1 - internal/pkg/nats/marshaller.go | 18 +++- internal/pkg/nats/marshaller_test.go | 105 ++++++++++++--------- internal/pkg/nats/options.go | 3 + internal/pkg/nats/options_test.go | 4 + 9 files changed, 173 insertions(+), 51 deletions(-) diff --git a/internal/pkg/constants.go b/internal/pkg/constants.go index ca4d27ed..fc81af74 100644 --- a/internal/pkg/constants.go +++ b/internal/pkg/constants.go @@ -31,6 +31,7 @@ const ( RetryOnFailedConnect = "RetryOnFailedConnect" Format = "Format" QueueGroup = "QueueGroup" + ExactlyOnce = "ExactlyOnce" // NATS JetStream specifics Durable = "Durable" diff --git a/internal/pkg/nats/client.go b/internal/pkg/nats/client.go index 85ef50bc..7a20fd27 100644 --- a/internal/pkg/nats/client.go +++ b/internal/pkg/nats/client.go @@ -54,7 +54,7 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor return nil, fmt.Errorf("connectionFactory is required") } - var m interfaces.MarshallerUnmarshaller = &natsMarshaller{} // default + var m interfaces.MarshallerUnmarshaller cc, err := CreateClientConfiguration(cfg) @@ -64,9 +64,9 @@ func NewClientWithConnectionFactory(cfg types.MessageBusConfig, connectionFactor switch strings.ToLower(cc.Format) { case "json": - m = &jsonMarshaller{} + m = &jsonMarshaller{opts: cc} default: - m = &natsMarshaller{} + m = &natsMarshaller{opts: cc} } return &Client{config: cc, connect: connectionFactory, m: m}, nil @@ -137,6 +137,16 @@ func (c *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error } else { tc.Messages <- env } + + if c.config.ExactlyOnce { + // AckSync carries a performance penalty + // but is needed for subscribe side of ExactlyOnce + if ackErr := msg.AckSync(); err != nil { + messageErrors <- ackErr + } + } else { + msg.Ack() + } }) if err != nil { diff --git a/internal/pkg/nats/jetstream/client.go b/internal/pkg/nats/jetstream/client.go index 0e3c846d..ed80524b 100644 --- a/internal/pkg/nats/jetstream/client.go +++ b/internal/pkg/nats/jetstream/client.go @@ -93,7 +93,7 @@ func NewClient(cfg types.MessageBusConfig) (*natsMessaging.Client, error) { } func subOpt(cc natsMessaging.ClientConfig) []nats.SubOpt { - return []nats.SubOpt{nats.AckNone(), parseDeliver(cc.Deliver)()} + return []nats.SubOpt{nats.AckExplicit(), parseDeliver(cc.Deliver)()} } func pubOpt(cc natsMessaging.ClientConfig) []nats.PubOpt { diff --git a/internal/pkg/nats/jetstream/client_test.go b/internal/pkg/nats/jetstream/client_test.go index 63187501..4651d724 100644 --- a/internal/pkg/nats/jetstream/client_test.go +++ b/internal/pkg/nats/jetstream/client_test.go @@ -19,10 +19,13 @@ package jetstream import ( + "context" "reflect" "testing" + "github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg" natsMessaging "github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/nats" + "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/google/uuid" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" @@ -73,6 +76,77 @@ func Test_autoProvision(t *testing.T) { } } +func Test_ExactlyOnce(t *testing.T) { + port := 4223 + srvr, err := server.NewServer(&server.Options{JetStream: true, StoreDir: t.TempDir(), Port: port}) + assert.NoError(t, err) + + srvr.Start() + defer srvr.Shutdown() + + client, err := NewClient(types.MessageBusConfig{ + PublishHost: types.HostInfo{ + Host: "localhost", + Port: port, + Protocol: "nats", + }, + SubscribeHost: types.HostInfo{ + Host: "", + Port: 0, + Protocol: "", + }, + Type: "nats-jetstream", + Optional: map[string]string{ + pkg.ExactlyOnce: "true", + pkg.AutoProvision: "true", + pkg.Subject: "edgex/#", + }, + }) + + assert.NoError(t, err) + + assert.NoError(t, client.Connect()) + + topic := "edgex/#" + messages := make(chan types.MessageEnvelope) + errors := make(chan error) + + err = client.Subscribe([]types.TopicChannel{{ + Topic: topic, + Messages: messages, + }}, errors) + + m0 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())} + m1 := types.MessageEnvelope{CorrelationID: uuid.NewString(), Payload: []byte(uuid.NewString())} + + ctx, done := context.WithCancel(context.Background()) + + go func() { + assert.NoError(t, client.Publish(m0, topic)) + assert.NoError(t, client.Publish(m0, topic)) + assert.NoError(t, client.Publish(m1, topic)) + assert.NoError(t, client.Publish(m1, topic)) + assert.NoError(t, client.Publish(m0, topic)) + assert.NoError(t, client.Publish(m1, topic)) + + done() + }() + + received := make([]types.MessageEnvelope, 0) + + for { + select { + case m := <-messages: + received = append(received, m) + case <-ctx.Done(): + assert.Equal(t, 2, len(received)) + assert.Equal(t, m0.CorrelationID, received[0].CorrelationID) + assert.Equal(t, m1.CorrelationID, received[1].CorrelationID) + return + } + } +} + func Test_parseDeliver(t *testing.T) { tests := []struct { name string diff --git a/internal/pkg/nats/jetstream/connection.go b/internal/pkg/nats/jetstream/connection.go index 62e6651b..6bb3eb98 100644 --- a/internal/pkg/nats/jetstream/connection.go +++ b/internal/pkg/nats/jetstream/connection.go @@ -45,7 +45,6 @@ func (j connection) QueueSubscribe(s string, q string, handler nats.MsgHandler) // use the configured subject to bind subscription to stream opts = append(opts, nats.BindStream(subjectToStreamName(natsMessaging.TopicToSubject(j.cfg.Subject)))) } - return j.js.QueueSubscribe(s, q, handler, opts...) } diff --git a/internal/pkg/nats/marshaller.go b/internal/pkg/nats/marshaller.go index a0dcabaf..b683cf69 100644 --- a/internal/pkg/nats/marshaller.go +++ b/internal/pkg/nats/marshaller.go @@ -21,6 +21,7 @@ package nats import ( "encoding/json" + "fmt" "strconv" "strings" @@ -29,7 +30,9 @@ import ( "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" ) -type jsonMarshaller struct{} +type jsonMarshaller struct { + opts ClientConfig +} func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) { var err error @@ -39,6 +42,11 @@ func (jm *jsonMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) out := nats.NewMsg(subject) out.Data, err = json.Marshal(v) + if jm.opts.ExactlyOnce { + // the broker should only accept a message once per publishing service / correlation ID + out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", jm.opts.ClientId, v.CorrelationID)) + } + if err != nil { return nil, err } @@ -65,7 +73,9 @@ const ( queryParamsHeader = "QueryParams" ) -type natsMarshaller struct{} +type natsMarshaller struct { + opts ClientConfig +} func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) (*nats.Msg, error) { subject := TopicToSubject(publishTopic) @@ -83,6 +93,10 @@ func (nm *natsMarshaller) Marshal(v types.MessageEnvelope, publishTopic string) out.Header.Add(queryParamsHeader, query) } } + if nm.opts.ExactlyOnce { + // the broker should only accept a message once per publishing service / correlation ID + out.Header.Set(nats.MsgIdHdr, fmt.Sprintf("%s-%s", nm.opts.ClientId, v.CorrelationID)) + } return out, nil } diff --git a/internal/pkg/nats/marshaller_test.go b/internal/pkg/nats/marshaller_test.go index fd629733..34c7e9c3 100644 --- a/internal/pkg/nats/marshaller_test.go +++ b/internal/pkg/nats/marshaller_test.go @@ -39,25 +39,34 @@ var marshallerCases = map[string]interfaces.MarshallerUnmarshaller{ } func TestJSONMarshaller(t *testing.T) { - sut := &jsonMarshaller{} - pubTopic := uuid.NewString() + suts := []*jsonMarshaller{{}, {ClientConfig{ClientOptions: ClientOptions{ClientId: uuid.NewString(), ExactlyOnce: true}}}} + + for _, sut := range suts { + t.Run(fmt.Sprintf("ExactlyOnce_%t", sut.opts.ExactlyOnce), func(t *testing.T) { + pubTopic := uuid.NewString() - orig := sampleMessage(100) - expected := orig + orig := sampleMessage(100) + expected := orig - expected.ReceivedTopic = pubTopic // this is set from NATS message + expected.ReceivedTopic = pubTopic // this is set from NATS message - marshaled, err := sut.Marshal(orig, pubTopic) + marshaled, err := sut.Marshal(orig, pubTopic) - assert.NoError(t, err) + assert.NoError(t, err) - unmarshaled := types.MessageEnvelope{} + if sut.opts.ExactlyOnce { + assert.Equal(t, fmt.Sprintf("%s-%s", sut.opts.ClientId, orig.CorrelationID), marshaled.Header.Get(nats.MsgIdHdr)) + } - err = sut.Unmarshal(marshaled, &unmarshaled) + unmarshaled := types.MessageEnvelope{} - assert.NoError(t, err) + err = sut.Unmarshal(marshaled, &unmarshaled) - assert.Equal(t, expected, unmarshaled) + assert.NoError(t, err) + + assert.Equal(t, expected, unmarshaled) + }) + } } func TestJSONMarshaller_Unmarshal_Invalid_JSON(t *testing.T) { @@ -73,40 +82,48 @@ func TestJSONMarshaller_Unmarshal_Invalid_JSON(t *testing.T) { } func TestNATSMarshaller(t *testing.T) { - sut := &natsMarshaller{} - pubTopic := uuid.NewString() - - validWithNoQueryParams := sampleMessage(100) - validWithNoQueryParams.ReceivedTopic = pubTopic - validWithQueryParams := validWithNoQueryParams - validWithQueryParams.QueryParams = map[string]string{"foo": "bar"} - - tests := []struct { - name string - envelope types.MessageEnvelope - emptyQueryParams bool - }{ - {"valid", validWithQueryParams, false}, - {"valid - no query parameters", validWithNoQueryParams, true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - marshaled, err := sut.Marshal(tt.envelope, pubTopic) - assert.NoError(t, err) - assert.NotEmpty(t, marshaled.Header.Get(correlationIDHeader)) - assert.NotEmpty(t, marshaled.Header.Get(requestIDHeader)) - assert.Equal(t, types.ApiVersion, marshaled.Header.Get(apiVersionHeader)) - assert.Equal(t, "0", marshaled.Header.Get(errorCodeHeader)) - if tt.emptyQueryParams { - assert.Empty(t, marshaled.Header.Get(queryParamsHeader)) - } else { - assert.NotEmpty(t, marshaled.Header.Get(queryParamsHeader)) - } + suts := []*natsMarshaller{{}, {ClientConfig{ClientOptions: ClientOptions{ClientId: uuid.NewString(), ExactlyOnce: true}}}} - unmarshaled := types.MessageEnvelope{} - err = sut.Unmarshal(marshaled, &unmarshaled) - assert.NoError(t, err) - assert.Equal(t, tt.envelope, unmarshaled) + for _, sut := range suts { + t.Run(fmt.Sprintf("ExactlyOnce_%t", sut.opts.ExactlyOnce), func(t *testing.T) { + + pubTopic := uuid.NewString() + + validWithNoQueryParams := sampleMessage(100) + validWithNoQueryParams.ReceivedTopic = pubTopic + validWithQueryParams := validWithNoQueryParams + validWithQueryParams.QueryParams = map[string]string{"foo": "bar"} + + tests := []struct { + name string + envelope types.MessageEnvelope + emptyQueryParams bool + }{ + {"valid", validWithQueryParams, false}, + {"valid - no query parameters", validWithNoQueryParams, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + marshaled, err := sut.Marshal(tt.envelope, pubTopic) + assert.NoError(t, err) + assert.NotEmpty(t, marshaled.Header.Get(correlationIDHeader)) + assert.NotEmpty(t, marshaled.Header.Get(requestIDHeader)) + assert.Equal(t, types.ApiVersion, marshaled.Header.Get(apiVersionHeader)) + assert.Equal(t, "0", marshaled.Header.Get(errorCodeHeader)) + if tt.emptyQueryParams { + assert.Empty(t, marshaled.Header.Get(queryParamsHeader)) + } else { + assert.NotEmpty(t, marshaled.Header.Get(queryParamsHeader)) + } + if sut.opts.ExactlyOnce { + assert.Equal(t, fmt.Sprintf("%s-%s", sut.opts.ClientId, tt.envelope.CorrelationID), marshaled.Header.Get(nats.MsgIdHdr)) + } + unmarshaled := types.MessageEnvelope{} + err = sut.Unmarshal(marshaled, &unmarshaled) + assert.NoError(t, err) + assert.Equal(t, tt.envelope, unmarshaled) + }) + } }) } } diff --git a/internal/pkg/nats/options.go b/internal/pkg/nats/options.go index b58c1268..e632f0c4 100644 --- a/internal/pkg/nats/options.go +++ b/internal/pkg/nats/options.go @@ -64,6 +64,7 @@ type ClientOptions struct { DefaultPubRetryAttempts int NKeySeedFile string CredentialsFile string + ExactlyOnce bool } // CreateClientConfiguration constructs a ClientConfig based on the provided MessageBusConfig. @@ -161,5 +162,7 @@ func CreateClientOptionsWithDefaults() ClientOptions { Format: "nats", NKeySeedFile: "", CredentialsFile: "", + Deliver: "new", + ExactlyOnce: false, //could use QOS = 2 here } } diff --git a/internal/pkg/nats/options_test.go b/internal/pkg/nats/options_test.go index eb86c211..8e120b61 100644 --- a/internal/pkg/nats/options_test.go +++ b/internal/pkg/nats/options_test.go @@ -57,6 +57,7 @@ func TestCreateClientConfiguration(t *testing.T) { pkg.QueueGroup: "group-1", pkg.DefaultPubRetryAttempts: "12", pkg.Deliver: "set-deliver", + pkg.ExactlyOnce: "true", }}}, ClientConfig{ BrokerURL: "tcp://example.com:9090", @@ -73,6 +74,7 @@ func TestCreateClientConfiguration(t *testing.T) { QueueGroup: "group-1", DefaultPubRetryAttempts: 12, Deliver: "set-deliver", + ExactlyOnce: true, }, }, false, @@ -100,6 +102,7 @@ func TestCreateClientConfiguration(t *testing.T) { ConnectTimeout: 7, DefaultPubRetryAttempts: 2, Format: "nats", + Deliver: "new", }}, false, }, @@ -155,6 +158,7 @@ func TestCreateClientConfiguration(t *testing.T) { ConnectTimeout: 7, DefaultPubRetryAttempts: 2, Format: "nats", + Deliver: "new", }, }, false,