From 5dc2114daf901c71807f6a506c5dc17dbbebd133 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Fri, 20 May 2022 21:27:48 +0800 Subject: [PATCH] allow bridge mq cost msg (#162) --- broker/bridge.go | 6 ++++-- broker/client.go | 6 +++++- broker/info.go | 4 ++-- plugins/bridge/bridge.go | 3 ++- plugins/bridge/csvlog.go | 6 +++--- plugins/bridge/kafka.go | 6 +++--- plugins/bridge/mock.go | 4 ++-- 7 files changed, 21 insertions(+), 14 deletions(-) diff --git a/broker/bridge.go b/broker/bridge.go index a029ed90..97c7f024 100644 --- a/broker/bridge.go +++ b/broker/bridge.go @@ -5,11 +5,13 @@ import ( "go.uber.org/zap" ) -func (b *Broker) Publish(e *bridge.Elements) { +func (b *Broker) Publish(e *bridge.Elements) bool { if b.bridgeMQ != nil { - err := b.bridgeMQ.Publish(e) + cost, err := b.bridgeMQ.Publish(e) if err != nil { log.Error("send message to mq error.", zap.Error(err)) } + return cost } + return false } diff --git a/broker/client.go b/broker/client.go index 64a1a78b..83edce01 100644 --- a/broker/client.go +++ b/broker/client.go @@ -414,7 +414,7 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) { } //publish to bridge mq - c.broker.Publish(&bridge.Elements{ + cost := c.broker.Publish(&bridge.Elements{ ClientID: c.info.clientID, Username: c.info.username, Action: bridge.Publish, @@ -423,6 +423,10 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) { Topic: topic, }) + if cost { + return + } + switch packet.Qos { case QosAtMostOnce: c.ProcessPublishMessage(packet) diff --git a/broker/info.go b/broker/info.go index 6154f75a..53cbdcf2 100644 --- a/broker/info.go +++ b/broker/info.go @@ -15,7 +15,7 @@ func (c *client) SendInfo() { } url := c.info.localIP + ":" + c.broker.config.Cluster.Port - infoMsg := NewInfo(c.broker.id, url, false) + infoMsg := NewInfo(c.broker.id, url) err := c.WriterPacket(infoMsg) if err != nil { log.Error("send info message error, ", zap.Error(err)) @@ -60,7 +60,7 @@ func (c *client) SendConnect() { log.Info("send connect success") } -func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { +func NewInfo(sid, url string) *packets.PublishPacket { pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) pub.Qos = 0 pub.TopicName = BrokerInfoTopic diff --git a/plugins/bridge/bridge.go b/plugins/bridge/bridge.go index d22288cb..b2d7c4c5 100644 --- a/plugins/bridge/bridge.go +++ b/plugins/bridge/bridge.go @@ -37,7 +37,8 @@ const ( ) type BridgeMQ interface { - Publish(e *Elements) error + // Publish return true to cost the message + Publish(e *Elements) (bool, error) } func NewBridgeMQ(name string) BridgeMQ { diff --git a/plugins/bridge/csvlog.go b/plugins/bridge/csvlog.go index 7f64a674..df0b9479 100644 --- a/plugins/bridge/csvlog.go +++ b/plugins/bridge/csvlog.go @@ -353,7 +353,7 @@ func (c *csvLog) logFilePrune() error { // Publish implements the bridge interface - it accepts an Element then checks to see if that element is a // message published to the admin topic for the plugin // -func (c *csvLog) Publish(e *Elements) error { +func (c *csvLog) Publish(e *Elements) (bool, error) { // A short-lived lock on c allows us to // get the Command topic then release the lock // This then allows us to process the command - which may @@ -372,7 +372,7 @@ func (c *csvLog) Publish(e *Elements) error { // If the outfile is set to "{NULL}" we don't do anything with the message - we just return nil // This feature is here to allow CSVLOG to be enabled/disabled at runtime if OutFile == "{NULL}" { - return nil + return false, nil } if e.Topic == CommandTopic { @@ -410,5 +410,5 @@ func (c *csvLog) Publish(e *Elements) error { // Push the message into the channel and return // the channel is buffered and is read by a goroutine so this should block for the shortest possible time c.msgchan <- e - return nil + return false, nil } diff --git a/plugins/bridge/kafka.go b/plugins/bridge/kafka.go index 158e2d99..9228597e 100644 --- a/plugins/bridge/kafka.go +++ b/plugins/bridge/kafka.go @@ -63,7 +63,7 @@ func (k *kafka) connect() { } //Publish publish to kafka -func (k *kafka) Publish(e *Elements) error { +func (k *kafka) Publish(e *Elements) (bool, error) { config := k.kafkaConfig key := e.ClientID topics := make(map[string]bool) @@ -96,10 +96,10 @@ func (k *kafka) Publish(e *Elements) error { topics[config.DisconnectTopic] = true } default: - return errors.New("error action: " + e.Action) + return false, errors.New("error action: " + e.Action) } - return k.publish(topics, key, e) + return false, k.publish(topics, key, e) } diff --git a/plugins/bridge/mock.go b/plugins/bridge/mock.go index 8d097f7f..de88d4a4 100644 --- a/plugins/bridge/mock.go +++ b/plugins/bridge/mock.go @@ -2,6 +2,6 @@ package bridge type mockMQ struct{} -func (m *mockMQ) Publish(e *Elements) error { - return nil +func (m *mockMQ) Publish(e *Elements) (bool, error) { + return false, nil }