Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Logs #203

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions clightning/clightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,11 @@ func (cl *ClightningClient) OnCustomMsg(event *glightning.CustomMsgReceivedEvent
}
for _, v := range cl.msgHandlers {
err := v(event.PeerId, typeString, payloadDecoded)
if err != nil {
// We silence logging on AlreadyExistsErrors as this is just spammy
// and we already log that we received a message of the same type
// earlier.
if err != nil && !errors.Is(err, swap.AlreadyExistsError) {
log.Debugf("\n msghandler err: %v", err)
return event.Continue(), nil
}
}
return event.Continue(), nil
Expand Down
6 changes: 3 additions & 3 deletions messages/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewRedundantMessenger(messenger Messenger, retryTime time.Duration) *Redund
}

func (s *RedundantMessenger) SendMessage(peerId string, message []byte, messageType int) error {
log.Debugf("[RedundantSender]\tstart sending messages of type %d to %s\n", messageType, peerId)
log.Debugf("[RedundantSender] start sending messages of type %d to %s", messageType, peerId)

// Send one time before we go loop the send, so that we do not have to wait for the ticker.
err := s.messenger.SendMessage(peerId, message, messageType)
Expand All @@ -45,10 +45,10 @@ func (s *RedundantMessenger) SendMessage(peerId string, message []byte, messageT
case <-s.ticker.C:
err := s.messenger.SendMessage(peerId, message, messageType)
if err != nil {
log.Debugf("[RedundantSender]\tSendMessageWithRetry: %v\n", err)
log.Debugf("[RedundantSender] SendMessageWithRetry: %v", err)
}
case <-s.stop:
log.Debugf("[RedundantSender]\tstop sending messages of type %d to %s\n", messageType, peerId)
log.Debugf("[RedundantSender] stop sending messages of type %d to %s", messageType, peerId)
return
}
}
Expand Down
41 changes: 29 additions & 12 deletions swap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type SwapService struct {
BitcoinEnabled bool
LiquidEnabled bool
sync.RWMutex

lastMsgLog map[string]string
}

func NewSwapService(services *SwapServices) *SwapService {
Expand All @@ -66,6 +68,7 @@ func NewSwapService(services *SwapServices) *SwapService {
activeSwaps: map[string]*SwapStateMachine{},
LiquidEnabled: services.liquidEnabled,
BitcoinEnabled: services.bitcoinEnabled,
lastMsgLog: map[string]string{},
}
}

Expand Down Expand Up @@ -141,6 +144,23 @@ func (s *SwapService) RecoverSwaps() error {
return nil
}

func (s *SwapService) logMsg(swapId, peerId, msgTypeString string, payload []byte) {
s.Lock()
defer s.Unlock()
if lastMsgType, ok := s.lastMsgLog[swapId]; ok {
if lastMsgType == msgTypeString {
// We already logged this message, just tell that we received the
// last message again.
log.Debugf("[Messenger] From: %s got same message for swap: %s", peerId, swapId)
return
}
}
// We see the message type for this swap for the first time, we log the
// message.
s.lastMsgLog[swapId] = msgTypeString
log.Debugf("[Messenger] From: %s got msgtype: %s with payload: %s for swap: %s", peerId, msgTypeString, payload, swapId)
}

// OnMessageReceived handles incoming valid peermessages
func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, payload []byte) error {
if len(payload) > 100*1024 {
Expand All @@ -156,24 +176,24 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
// Do nothing here, as it will spam the cln log.
return nil
case messages.MESSAGETYPE_SWAPOUTREQUEST:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
// s.logMsg(peerId, msgTypeString, payload)
var msg *SwapOutRequestMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}
s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
err = s.OnSwapOutRequestReceived(msg.SwapId, peerId, msg)
if err != nil {
return err
}
case messages.MESSAGETYPE_SWAPOUTAGREEMENT:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapOutAgreementMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}

s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
// Check if sender is expected swap partner peer.
ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId)
if err != nil {
Expand All @@ -188,13 +208,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_OPENINGTXBROADCASTED:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *OpeningTxBroadcastedMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}

s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
// Check if sender is expected swap partner peer.
ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId)
if err != nil {
Expand All @@ -209,13 +228,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_CANCELED:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *CancelMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}

s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
// Check if sender is expected swap partner peer.
ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId)
if err != nil {
Expand All @@ -230,24 +248,23 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_SWAPINREQUEST:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapInRequestMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}
s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
err = s.OnSwapInRequestReceived(msg.SwapId, peerId, msg)
if err != nil {
return err
}
case messages.MESSAGETYPE_SWAPINAGREEMENT:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *SwapInAgreementMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}

s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
// Check if sender is expected swap partner peer.
ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId)
if err != nil {
Expand All @@ -262,13 +279,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay
return err
}
case messages.MESSAGETYPE_COOPCLOSE:
log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload)
var msg *CoopCloseMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
return err
}

s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload)
// Check if sender is expected swap partner peer.
ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId)
if err != nil {
Expand Down Expand Up @@ -723,6 +739,7 @@ func (s *SwapService) GetActiveSwap(swapId string) (*SwapStateMachine, error) {
func (s *SwapService) RemoveActiveSwap(swapId string) {
s.Lock()
defer s.Unlock()
delete(s.lastMsgLog, swapId)
delete(s.activeSwaps, swapId)
}

Expand Down
3 changes: 1 addition & 2 deletions txwatcher/rpctxwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *BlockchainRpcTxWatcher) HandleConfirmedTx(blockheight uint64) error {
continue
}
if !(res.Confirmations >= s.requiredConfs) {
log.Debugf("tx does not have enough confirmations")
continue
}
if s.txCallback == nil {
Expand Down Expand Up @@ -240,7 +239,7 @@ func (s *BlockchainRpcTxWatcher) CheckTxConfirmed(swapId string, txId string, vo
return ""
}
if !(res.Confirmations >= s.requiredConfs) {
log.Infof("tx does not have enough confirmations")
log.Infof("tx %s on swap %s does not have enough confirmations", txId, swapId)
return ""
}
if s.txCallback == nil {
Expand Down
Loading