diff --git a/clightning/clightning.go b/clightning/clightning.go index 5a9dba63..c7a6a20a 100644 --- a/clightning/clightning.go +++ b/clightning/clightning.go @@ -310,9 +310,11 @@ func (cl *ClightningClient) OnCustomMsg(event *glightning.CustomMsgReceivedEvent } for _, v := range cl.msgHandlers { err := v(event.PeerId, typeString, payloadDecoded) - if err != nil { + // We silence logging on AlreadyExistsErrors as this is just spammy + // and we already log that we received a message of the same type + // earlier. + if err != nil && !errors.Is(err, swap.AlreadyExistsError) { log.Debugf("\n msghandler err: %v", err) - return event.Continue(), nil } } return event.Continue(), nil diff --git a/messages/sender.go b/messages/sender.go index bb1471b5..e318c41c 100644 --- a/messages/sender.go +++ b/messages/sender.go @@ -31,7 +31,7 @@ func NewRedundantMessenger(messenger Messenger, retryTime time.Duration) *Redund } func (s *RedundantMessenger) SendMessage(peerId string, message []byte, messageType int) error { - log.Debugf("[RedundantSender]\tstart sending messages of type %d to %s\n", messageType, peerId) + log.Debugf("[RedundantSender] start sending messages of type %d to %s", messageType, peerId) // Send one time before we go loop the send, so that we do not have to wait for the ticker. err := s.messenger.SendMessage(peerId, message, messageType) @@ -45,10 +45,10 @@ func (s *RedundantMessenger) SendMessage(peerId string, message []byte, messageT case <-s.ticker.C: err := s.messenger.SendMessage(peerId, message, messageType) if err != nil { - log.Debugf("[RedundantSender]\tSendMessageWithRetry: %v\n", err) + log.Debugf("[RedundantSender] SendMessageWithRetry: %v", err) } case <-s.stop: - log.Debugf("[RedundantSender]\tstop sending messages of type %d to %s\n", messageType, peerId) + log.Debugf("[RedundantSender] stop sending messages of type %d to %s", messageType, peerId) return } } diff --git a/swap/service.go b/swap/service.go index f996a844..738c8bc0 100644 --- a/swap/service.go +++ b/swap/service.go @@ -58,6 +58,8 @@ type SwapService struct { BitcoinEnabled bool LiquidEnabled bool sync.RWMutex + + lastMsgLog map[string]string } func NewSwapService(services *SwapServices) *SwapService { @@ -66,6 +68,7 @@ func NewSwapService(services *SwapServices) *SwapService { activeSwaps: map[string]*SwapStateMachine{}, LiquidEnabled: services.liquidEnabled, BitcoinEnabled: services.bitcoinEnabled, + lastMsgLog: map[string]string{}, } } @@ -141,6 +144,23 @@ func (s *SwapService) RecoverSwaps() error { return nil } +func (s *SwapService) logMsg(swapId, peerId, msgTypeString string, payload []byte) { + s.Lock() + defer s.Unlock() + if lastMsgType, ok := s.lastMsgLog[swapId]; ok { + if lastMsgType == msgTypeString { + // We already logged this message, just tell that we received the + // last message again. + log.Debugf("[Messenger] From: %s got same message for swap: %s", peerId, swapId) + return + } + } + // We see the message type for this swap for the first time, we log the + // message. + s.lastMsgLog[swapId] = msgTypeString + log.Debugf("[Messenger] From: %s got msgtype: %s with payload: %s for swap: %s", peerId, msgTypeString, payload, swapId) +} + // OnMessageReceived handles incoming valid peermessages func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, payload []byte) error { if len(payload) > 100*1024 { @@ -156,24 +176,24 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay // Do nothing here, as it will spam the cln log. return nil case messages.MESSAGETYPE_SWAPOUTREQUEST: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) + // s.logMsg(peerId, msgTypeString, payload) var msg *SwapOutRequestMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) err = s.OnSwapOutRequestReceived(msg.SwapId, peerId, msg) if err != nil { return err } case messages.MESSAGETYPE_SWAPOUTAGREEMENT: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *SwapOutAgreementMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } - + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) // Check if sender is expected swap partner peer. ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId) if err != nil { @@ -188,13 +208,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay return err } case messages.MESSAGETYPE_OPENINGTXBROADCASTED: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *OpeningTxBroadcastedMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } - + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) // Check if sender is expected swap partner peer. ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId) if err != nil { @@ -209,13 +228,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay return err } case messages.MESSAGETYPE_CANCELED: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *CancelMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } - + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) // Check if sender is expected swap partner peer. ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId) if err != nil { @@ -230,24 +248,23 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay return err } case messages.MESSAGETYPE_SWAPINREQUEST: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *SwapInRequestMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) err = s.OnSwapInRequestReceived(msg.SwapId, peerId, msg) if err != nil { return err } case messages.MESSAGETYPE_SWAPINAGREEMENT: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *SwapInAgreementMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } - + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) // Check if sender is expected swap partner peer. ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId) if err != nil { @@ -262,13 +279,12 @@ func (s *SwapService) OnMessageReceived(peerId string, msgTypeString string, pay return err } case messages.MESSAGETYPE_COOPCLOSE: - log.Debugf("[Messenger] From: %s got msgtype: %s payload: %s", peerId, msgTypeString, payload) var msg *CoopCloseMessage err := json.Unmarshal(msgBytes, &msg) if err != nil { return err } - + s.logMsg(msg.SwapId.String(), peerId, msgTypeString, payload) // Check if sender is expected swap partner peer. ok, err := s.isMessageSenderExpectedPeer(peerId, msg.SwapId) if err != nil { @@ -723,6 +739,7 @@ func (s *SwapService) GetActiveSwap(swapId string) (*SwapStateMachine, error) { func (s *SwapService) RemoveActiveSwap(swapId string) { s.Lock() defer s.Unlock() + delete(s.lastMsgLog, swapId) delete(s.activeSwaps, swapId) } diff --git a/txwatcher/rpctxwatcher.go b/txwatcher/rpctxwatcher.go index 3c4b3f38..0e062f41 100644 --- a/txwatcher/rpctxwatcher.go +++ b/txwatcher/rpctxwatcher.go @@ -154,7 +154,6 @@ func (s *BlockchainRpcTxWatcher) HandleConfirmedTx(blockheight uint64) error { continue } if !(res.Confirmations >= s.requiredConfs) { - log.Debugf("tx does not have enough confirmations") continue } if s.txCallback == nil { @@ -240,7 +239,7 @@ func (s *BlockchainRpcTxWatcher) CheckTxConfirmed(swapId string, txId string, vo return "" } if !(res.Confirmations >= s.requiredConfs) { - log.Infof("tx does not have enough confirmations") + log.Infof("tx %s on swap %s does not have enough confirmations", txId, swapId) return "" } if s.txCallback == nil {