Skip to content

Commit

Permalink
feat: update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyy committed Jan 8, 2024
1 parent 5ba8038 commit 48b146d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 46 deletions.
66 changes: 33 additions & 33 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package broker

import (
"crypto/tls"
encJson "encoding/json"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
encJson "encoding/json"

"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
Expand Down Expand Up @@ -205,8 +205,8 @@ func (b *Broker) StartWebsocketListening() {
func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws)
ws.PayloadType = websocket.BinaryFrame
err:=b.handleConnection(CLIENT, ws)
if err!=nil{
err := b.handleConnection(CLIENT, ws)
if err != nil {
ws.Close()
}
}
Expand Down Expand Up @@ -259,9 +259,9 @@ func (b *Broker) StartClientListening(Tls bool) {
}

tmpDelay = ACCEPT_MIN_SLEEP
go func(){
err :=b.handleConnection(CLIENT, conn)
if err!=nil{
go func() {
err := b.handleConnection(CLIENT, conn)
if err != nil {
conn.Close()
}
}()
Expand Down Expand Up @@ -301,9 +301,9 @@ func (b *Broker) StartClusterListening() {
}
tmpDelay = ACCEPT_MIN_SLEEP

go func(){
err :=b.handleConnection(ROUTER, conn)
if err!=nil{
go func() {
err := b.handleConnection(ROUTER, conn)
if err != nil {
conn.Close()
}
}()
Expand All @@ -322,11 +322,11 @@ func (b *Broker) DisConnClientByClientId(clientId string) {
conn.Close()
}

func (b *Broker) handleConnection(typ int, conn net.Conn) error{
func (b *Broker) handleConnection(typ int, conn net.Conn) error {
//process connect packet
packet, err := packets.ReadPacket(conn)
if err != nil {
return errors.New(fmt.Sprintln("read connect packet error:%v",err))
return errors.New(fmt.Sprintf("read connect packet error:%v", err))
}
if packet == nil {
return errors.New("received nil packet")
Expand All @@ -344,21 +344,21 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{

if connack.ReturnCode != packets.Accepted {
if err := connack.Write(conn); err != nil {
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))
return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn)
}
return errors.New(fmt.Sprintln("connect packet validate failed with connack.ReturnCode%v",connack.ReturnCode))
return fmt.Errorf("connect packet validate failed with connack.ReturnCode%v", connack.ReturnCode)
}

if typ == CLIENT && !b.CheckConnectAuth(msg.ClientIdentifier, msg.Username, string(msg.Password)) {
connack.ReturnCode = packets.ErrRefusedNotAuthorised
if err := connack.Write(conn); err != nil {
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))
return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn)
}
return errors.New(fmt.Sprintln("connect packet CheckConnectAuth failed with connack.ReturnCode%v",connack.ReturnCode))
return fmt.Errorf("connect packet CheckConnectAuth failed with connack.ReturnCode%v", connack.ReturnCode)
}

if err := connack.Write(conn); err != nil {
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))
return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn)
}

willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
Expand Down Expand Up @@ -389,7 +389,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{
c.init()

if err := b.getSession(c, msg, connack); err != nil {
return errors.New(fmt.Sprintln("get session error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))
return fmt.Errorf("get session error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn)
}

cid := c.info.clientID
Expand All @@ -413,13 +413,13 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{
pubPack.TopicName = info.willMsg.TopicName
pubPack.Payload = info.willMsg.Payload
}

pubInfo := Info{
ClientID: info.clientID,
Username: info.username,
Password: info.password,
ClientID: info.clientID,
Username: info.username,
Password: info.password,
Keepalive: info.keepalive,
WillMsg: pubPack,
WillMsg: pubPack,
}

b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime)
Expand Down Expand Up @@ -456,7 +456,7 @@ func (b *Broker) ConnectToDiscovery() {
log.Error("Error trying to connect to route", zap.Error(err))
log.Debug("Connect to route timeout, retry...")

if 0 == tempDelay {
if tempDelay == 0 {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
Expand Down Expand Up @@ -528,7 +528,7 @@ func (b *Broker) connectRouter(id, addr string) {

log.Debug("Connect to route timeout, retry...")

if 0 == timeDelay {
if timeDelay == 0 {
timeDelay = 1 * time.Second
} else {
timeDelay *= 2
Expand Down Expand Up @@ -714,11 +714,11 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) {
}

type OnlineOfflineMsg struct {
ClientID string `json:"clientID"`
Online bool `json:"online"`
Timestamp string `json:"timestamp"`
ClientInfo Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
ClientID string `json:"clientID"`
Online bool `json:"online"`
Timestamp string `json:"timestamp"`
ClientInfo Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}

func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) {
Expand All @@ -727,10 +727,10 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64
packet.Qos = 0

msg := OnlineOfflineMsg{
ClientID: info.ClientID,
Online: online,
Timestamp: time.Now().UTC().Format(time.RFC3339),
ClientInfo: info,
ClientID: info.ClientID,
Online: online,
Timestamp: time.Now().UTC().Format(time.RFC3339),
ClientInfo: info,
LastMsgTime: lastMsg,
}

Expand Down
26 changes: 13 additions & 13 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type client struct {
mqueue *queue.Queue
retryTimer *time.Timer
retryTimerLock sync.Mutex
lastMsgTime int64
lastMsgTime int64
}

type InflightStatus uint8
Expand Down Expand Up @@ -113,16 +113,16 @@ type info struct {
}

type PubPacket struct {
TopicName string `json:"topicName"`
Payload []byte `json:"payload"`
TopicName string `json:"topicName"`
Payload []byte `json:"payload"`
}

type Info struct {
ClientID string `json:"clientId"`
Username string `json:"username"`
Password []byte `json:"password"`
Keepalive uint16 `json:"keepalive"`
WillMsg PubPacket `json:"willMsg"`
ClientID string `json:"clientId"`
Username string `json:"username"`
Password []byte `json:"password"`
Keepalive uint16 `json:"keepalive"`
WillMsg PubPacket `json:"willMsg"`
}

type route struct {
Expand All @@ -136,7 +136,7 @@ var (
)

func (c *client) init() {
c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged
c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged
c.status = Connected
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
remoteAddr := c.conn.RemoteAddr()
Expand Down Expand Up @@ -867,11 +867,11 @@ func (c *client) Close() {
}

pubInfo := Info{
ClientID: c.info.clientID,
Username: c.info.username,
Password: c.info.password,
ClientID: c.info.clientID,
Username: c.info.username,
Password: c.info.password,
Keepalive: c.info.keepalive,
WillMsg: pubPack,
WillMsg: pubPack,
}
//offline notification
b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime)
Expand Down

0 comments on commit 48b146d

Please sign in to comment.