Skip to content

Commit

Permalink
fix: debug logging for node-to-node clients (#740)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
  • Loading branch information
wolf31o2 authored Oct 5, 2024
1 parent 285b4ef commit 59c7df4
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 33 deletions.
18 changes: 8 additions & 10 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -97,7 +97,7 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -107,7 +107,7 @@ func (c *Client) Stop() error {
// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlockRange(start: %+v, end: %+v)", ProtocolName, start, end))
Debug(fmt.Sprintf("%s: client called GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})", ProtocolName, start.Slot, start.Hash, end.Slot, end.Hash))
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -129,7 +129,7 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetBlock(point: %+v)", ProtocolName, point))
Debug(fmt.Sprintf("%s: client called GetBlock(point: {Slot: %d, Hash: %x})", ProtocolName, point.Slot, point.Hash))
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand All @@ -153,8 +153,6 @@ func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeStartBatch:
Expand All @@ -177,22 +175,22 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client start batch for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client start batch for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client no blocks found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client no blocks found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client block found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client block found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -219,7 +217,7 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client batch done for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client batch done for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.busyMutex.Unlock()
return nil
}
50 changes: 36 additions & 14 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewClient(
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -132,7 +132,7 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("stopping protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: stopping protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -146,7 +146,7 @@ func (c *Client) Stop() error {
// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetCurrentTip()", ProtocolName))
Debug(fmt.Sprintf("%s: client %+v called GetCurrentTip()", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -185,10 +185,14 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
// The request is being handled by another request, wait for the result.
waitingForCurrentTipChan = nil
case tip := <-waitingResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
// The result from the other request is ready.
done.Store(true)
return &tip, nil
case tip := <-requestResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
// If waitingForCurrentTipChan is full, the for loop that empties it might finish the
// loop before the select statement that writes to it is triggered. For that reason we
// require requestResultChan here.
Expand All @@ -204,15 +208,25 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
func (c *Client) GetAvailableBlockRange(
intersectPoints []common.Point,
) (common.Point, common.Point, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

// Use origin if no intersect points were specified
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
}

// Find our chain intersection
result := c.requestFindIntersect(intersectPoints)
if result.error != nil {
Expand Down Expand Up @@ -279,14 +293,24 @@ func (c *Client) GetAvailableBlockRange(
// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
// via the RollForward callback function specified in the protocol config
func (c *Client) Sync(intersectPoints []common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("client called %s Sync(intersectPoints: %+v)", ProtocolName, intersectPoints))
c.busyMutex.Lock()
defer c.busyMutex.Unlock()

// Use origin if no intersect points were specified
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
}

intersectResultChan, cancel := c.wantIntersectFound()
msg := NewMsgFindIntersect(intersectPoints)
Expand Down Expand Up @@ -430,8 +454,6 @@ func (c *Client) requestFindIntersect(
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
var err error
switch msg.Type() {
case MessageTypeAwaitReply:
Expand All @@ -456,13 +478,13 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client await reply for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client await reply for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll forward for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client roll forward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -572,7 +594,7 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client roll backward for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client roll backward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -599,7 +621,7 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client intersect found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -613,7 +635,7 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client intersect not found for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client intersect not found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
12 changes: 5 additions & 7 deletions protocol/handshake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
MessageHandlerFunc: c.handleMessage,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: statePropose,
Expand All @@ -70,17 +70,15 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("starting protocol: %s", ProtocolName))
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Send our ProposeVersions message
msg := NewMsgProposeVersions(c.config.ProtocolVersionMap)
_ = c.SendMessage(msg)
})
}

func (c *Client) handleMessage(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client message for %s", ProtocolName))
func (c *Client) messageHandler(msg protocol.Message) error {
var err error
switch msg.Type() {
case MessageTypeAcceptVersion:
Expand All @@ -99,7 +97,7 @@ func (c *Client) handleMessage(msg protocol.Message) error {

func (c *Client) handleAcceptVersion(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client accept version for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client accept version for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.FinishedFunc == nil {
return fmt.Errorf(
"received handshake AcceptVersion message but no callback function is defined",
Expand All @@ -122,7 +120,7 @@ func (c *Client) handleAcceptVersion(msg protocol.Message) error {

func (c *Client) handleRefuse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("handling client refuse for %s", ProtocolName))
Debug(fmt.Sprintf("%s: client refuse for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgRefuse)
var err error
switch msg.Reason[0].(uint64) {
Expand Down
5 changes: 5 additions & 0 deletions protocol/keepalive/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -68,6 +69,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {

func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand Down Expand Up @@ -119,6 +122,8 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleKeepAliveResponse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client keepalive response for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msg := msgGeneric.(*MsgKeepAliveResponse)
if msg.Cookie != c.config.Cookie {
return fmt.Errorf(
Expand Down
7 changes: 7 additions & 0 deletions protocol/peersharing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -66,6 +67,8 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
}

func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetPeers(amount: %d)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, amount))
msg := NewMsgShareRequest(amount)
if err := c.SendMessage(msg); err != nil {
return nil, err
Expand All @@ -78,6 +81,8 @@ func (c *Client) GetPeers(amount uint8) ([]PeerAddress, error) {
}

func (c *Client) handleMessage(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
var err error
switch msg.Type() {
case MessageTypeSharePeers:
Expand All @@ -93,6 +98,8 @@ func (c *Client) handleMessage(msg protocol.Message) error {
}

func (c *Client) handleSharePeers(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client share peers for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
msgSharePeers := msg.(*MsgSharePeers)
c.sharePeersChan <- msgSharePeers.PeerAddresses
return nil
Expand Down
2 changes: 0 additions & 2 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func New(config ProtocolConfig) *Protocol {
func (p *Protocol) Start() {
p.onceStart.Do(func() {
// Register protocol with muxer
p.Logger().Debug("registering protocol with muxer")
muxerProtocolRole := muxer.ProtocolRoleInitiator
if p.config.Role == ProtocolRoleServer {
muxerProtocolRole = muxer.ProtocolRoleResponder
Expand Down Expand Up @@ -160,7 +159,6 @@ func (p *Protocol) Start() {
func (p *Protocol) Stop() {
p.onceStop.Do(func() {
// Unregister protocol from muxer
p.Logger().Debug("unregistering protocol with muxer")
muxerProtocolRole := muxer.ProtocolRoleInitiator
if p.config.Role == ProtocolRoleServer {
muxerProtocolRole = muxer.ProtocolRoleResponder
Expand Down
7 changes: 7 additions & 0 deletions protocol/txsubmission/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
Logger: protoOptions.Logger,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
Expand All @@ -75,6 +76,8 @@ func (c *Client) Init() {
}

func (c *Client) messageHandler(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client message for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
var err error
switch msg.Type() {
case MessageTypeRequestTxIds:
Expand All @@ -92,6 +95,8 @@ func (c *Client) messageHandler(msg protocol.Message) error {
}

func (c *Client) handleRequestTxIds(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client request tx ids for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.RequestTxIdsFunc == nil {
return fmt.Errorf(
"received tx-submission RequestTxIds message but no callback function is defined",
Expand All @@ -116,6 +121,8 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error {
}

func (c *Client) handleRequestTxs(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client request txs for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
if c.config.RequestTxsFunc == nil {
return fmt.Errorf(
"received tx-submission RequestTxs message but no callback function is defined",
Expand Down

0 comments on commit 59c7df4

Please sign in to comment.