Skip to content

Commit

Permalink
allow bridge mq cost msg (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangJian He authored May 20, 2022
1 parent 92758c8 commit 5dc2114
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 14 deletions.
6 changes: 4 additions & 2 deletions broker/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"go.uber.org/zap"
)

func (b *Broker) Publish(e *bridge.Elements) {
func (b *Broker) Publish(e *bridge.Elements) bool {
if b.bridgeMQ != nil {
err := b.bridgeMQ.Publish(e)
cost, err := b.bridgeMQ.Publish(e)
if err != nil {
log.Error("send message to mq error.", zap.Error(err))
}
return cost
}
return false
}
6 changes: 5 additions & 1 deletion broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) {
}

//publish to bridge mq
c.broker.Publish(&bridge.Elements{
cost := c.broker.Publish(&bridge.Elements{
ClientID: c.info.clientID,
Username: c.info.username,
Action: bridge.Publish,
Expand All @@ -423,6 +423,10 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) {
Topic: topic,
})

if cost {
return
}

switch packet.Qos {
case QosAtMostOnce:
c.ProcessPublishMessage(packet)
Expand Down
4 changes: 2 additions & 2 deletions broker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (c *client) SendInfo() {
}
url := c.info.localIP + ":" + c.broker.config.Cluster.Port

infoMsg := NewInfo(c.broker.id, url, false)
infoMsg := NewInfo(c.broker.id, url)
err := c.WriterPacket(infoMsg)
if err != nil {
log.Error("send info message error, ", zap.Error(err))
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *client) SendConnect() {
log.Info("send connect success")
}

func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
func NewInfo(sid, url string) *packets.PublishPacket {
pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
pub.Qos = 0
pub.TopicName = BrokerInfoTopic
Expand Down
3 changes: 2 additions & 1 deletion plugins/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
)

type BridgeMQ interface {
Publish(e *Elements) error
// Publish return true to cost the message
Publish(e *Elements) (bool, error)
}

func NewBridgeMQ(name string) BridgeMQ {
Expand Down
6 changes: 3 additions & 3 deletions plugins/bridge/csvlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (c *csvLog) logFilePrune() error {
// Publish implements the bridge interface - it accepts an Element then checks to see if that element is a
// message published to the admin topic for the plugin
//
func (c *csvLog) Publish(e *Elements) error {
func (c *csvLog) Publish(e *Elements) (bool, error) {
// A short-lived lock on c allows us to
// get the Command topic then release the lock
// This then allows us to process the command - which may
Expand All @@ -372,7 +372,7 @@ func (c *csvLog) Publish(e *Elements) error {
// If the outfile is set to "{NULL}" we don't do anything with the message - we just return nil
// This feature is here to allow CSVLOG to be enabled/disabled at runtime
if OutFile == "{NULL}" {
return nil
return false, nil
}

if e.Topic == CommandTopic {
Expand Down Expand Up @@ -410,5 +410,5 @@ func (c *csvLog) Publish(e *Elements) error {
// Push the message into the channel and return
// the channel is buffered and is read by a goroutine so this should block for the shortest possible time
c.msgchan <- e
return nil
return false, nil
}
6 changes: 3 additions & 3 deletions plugins/bridge/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (k *kafka) connect() {
}

//Publish publish to kafka
func (k *kafka) Publish(e *Elements) error {
func (k *kafka) Publish(e *Elements) (bool, error) {
config := k.kafkaConfig
key := e.ClientID
topics := make(map[string]bool)
Expand Down Expand Up @@ -96,10 +96,10 @@ func (k *kafka) Publish(e *Elements) error {
topics[config.DisconnectTopic] = true
}
default:
return errors.New("error action: " + e.Action)
return false, errors.New("error action: " + e.Action)
}

return k.publish(topics, key, e)
return false, k.publish(topics, key, e)

}

Expand Down
4 changes: 2 additions & 2 deletions plugins/bridge/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package bridge

type mockMQ struct{}

func (m *mockMQ) Publish(e *Elements) error {
return nil
func (m *mockMQ) Publish(e *Elements) (bool, error) {
return false, nil
}

0 comments on commit 5dc2114

Please sign in to comment.