Skip to content

Commit

Permalink
Merge pull request #1094 from yanmxa/br_fix_mqtt
Browse files Browse the repository at this point in the history
Fix race condition in MQTT protocol when sending messages
  • Loading branch information
embano1 authored Sep 11, 2024
2 parents 3753f4e + e029ccf commit 682f3a9
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
13 changes: 11 additions & 2 deletions protocol/mqtt_paho/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

type Protocol struct {
client *paho.Client
config *paho.ClientConfig
connOption *paho.Connect
publishOption *paho.Publish
subscribeOption *paho.Subscribe
Expand Down Expand Up @@ -89,7 +88,7 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
var err error
defer m.Finish(err)

msg := p.publishOption
msg := p.publishMsg()
if cecontext.TopicFrom(ctx) != "" {
msg.Topic = cecontext.TopicFrom(ctx)
cecontext.WithTopic(ctx, "")
Expand All @@ -107,6 +106,16 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
return err
}

// publishMsg generate a new paho.Publish message from the p.publishOption
func (p *Protocol) publishMsg() *paho.Publish {
return &paho.Publish{
QoS: p.publishOption.QoS,
Retain: p.publishOption.Retain,
Topic: p.publishOption.Topic,
Properties: p.publishOption.Properties,
}
}

func (p *Protocol) OpenInbound(ctx context.Context) error {
if p.subscribeOption == nil {
return fmt.Errorf("the paho.Subscribe option must not be nil")
Expand Down
98 changes: 98 additions & 0 deletions test/integration/mqtt_paho/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"context"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)

func TestConcurrentSendingEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

topicName := "test-ce-client-" + uuid.New().String()

readyCh := make(chan bool)
defer close(readyCh)

senderNum := 10 // 10 gorutine to sending the events
eventNum := 1000 // each gorutine sender publishs 1,000 events

var g errgroup.Group

// start a receiver
c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)
g.Go(func() error {
// verify all of events can be recieved
count := senderNum * eventNum
var mu sync.Mutex
return c.StartReceiver(ctx, func(event cloudevents.Event) {
mu.Lock()
defer mu.Unlock()
count--
if count == 0 {
readyCh <- true
}
})
})
// wait for 5 seconds to ensure the receiver starts safely
time.Sleep(5 * time.Second)

// start a sender client to pulish events concurrently
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)

evt := cloudevents.NewEvent()
evt.SetType("com.cloudevents.sample.sent")
evt.SetSource("concurrent-sender")
err = evt.SetData(cloudevents.ApplicationJSON, map[string]interface{}{"message": "Hello, World!"})
require.NoError(t, err)

for i := 0; i < senderNum; i++ {
g.Go(func() error {
for j := 0; j < eventNum; j++ {
result := client.Send(
cecontext.WithTopic(ctx, topicName),
evt,
)
if result != nil {
return result
}
}
return nil
})
}

// wait until all the events are received
handleEvent(ctx, readyCh, cancel, t)

require.NoError(t, g.Wait())
}

func handleEvent(ctx context.Context, readyCh <-chan bool, cancel context.CancelFunc, t *testing.T) {
for {
select {
case <-ctx.Done():
require.Fail(t, "Test failed: timeout reached before events were received")
return
case <-readyCh:
cancel()
t.Logf("Test passed: events successfully received")
return
}
}
}

0 comments on commit 682f3a9

Please sign in to comment.