Skip to content

Commit

Permalink
eth: minor polishes to make logs more useful
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jan 24, 2021
1 parent 324b8a6 commit 88dc6ee
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 56 deletions.
4 changes: 2 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
logger = log.New("peer", id[:8])
}
logger.Trace("Registering sync peer")
if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
Expand All @@ -325,7 +325,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
logger = log.New("peer", id[:8])
}
logger.Trace("Unregistering sync peer")
if err := d.peers.Unregister(id); err != nil {
Expand Down
16 changes: 12 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,24 +326,32 @@ func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
}

func (h *handler) removePeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:8])
}
// Remove the eth peer if it exists
eth := h.peers.ethPeer(id)
if eth != nil {
log.Debug("Removing Ethereum peer", "peer", id)
logger.Debug("Removing Ethereum peer")
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

if err := h.peers.unregisterEthPeer(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err)
logger.Error("Ethereum peer removal failed", "err", err)
}
}
// Remove the snap peer if it exists
snap := h.peers.snapPeer(id)
if snap != nil {
log.Debug("Removing Snapshot peer", "peer", id)
logger.Debug("Removing Snapshot peer")
h.downloader.SnapSyncer.Unregister(id)
if err := h.peers.unregisterSnapPeer(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err)
logger.Error("Snapshot peer removel failed", "err", err)
}
}
// Hard disconnect at the networking layer
Expand Down
89 changes: 39 additions & 50 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,26 +481,27 @@ func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
// Register injects a new data source into the syncer's peerset.
func (s *Syncer) Register(peer SyncPeer) error {
// Make sure the peer is not registered yet
id := peer.ID()

s.lock.Lock()
pId := peer.ID()
if _, ok := s.peers[pId]; ok {
log.Error("Snap peer already registered", "id", pId)
if _, ok := s.peers[id]; ok {
log.Error("Snap peer already registered", "id", id)

s.lock.Unlock()
return errors.New("already registered")
}
s.peers[pId] = peer
s.peers[id] = peer

// Mark the peer as idle, even if no sync is running
s.accountIdlers[pId] = struct{}{}
s.storageIdlers[pId] = struct{}{}
s.bytecodeIdlers[pId] = struct{}{}
s.trienodeHealIdlers[pId] = struct{}{}
s.bytecodeHealIdlers[pId] = struct{}{}
s.accountIdlers[id] = struct{}{}
s.storageIdlers[id] = struct{}{}
s.bytecodeIdlers[id] = struct{}{}
s.trienodeHealIdlers[id] = struct{}{}
s.bytecodeHealIdlers[id] = struct{}{}
s.lock.Unlock()

// Notify any active syncs that a new peer can be assigned data
s.peerJoin.Send(pId)
s.peerJoin.Send(id)
return nil
}

Expand Down Expand Up @@ -596,17 +597,15 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
return nil
}
// Assign all the data retrieval tasks to any free peers
acTasks := s.assignAccountTasks(cancel)
bcTasks := s.assignBytecodeTasks(cancel)
stTasks := s.assignStorageTasks(cancel)
var tnhTasks, bchTasks int
s.assignAccountTasks(cancel)
s.assignBytecodeTasks(cancel)
s.assignStorageTasks(cancel)

if len(s.tasks) == 0 {
// Sync phase done, run heal phase
tnhTasks = s.assignTrienodeHealTasks(cancel)
bchTasks = s.assignBytecodeHealTasks(cancel)
s.assignTrienodeHealTasks(cancel)
s.assignBytecodeHealTasks(cancel)
}
log.Debug("Assigned tasks", "account", acTasks, "bytecode", bcTasks,
"storage", stTasks, "trie-heal", tnhTasks, "byte-heal", bchTasks)
// Wait for something to happen
select {
case <-s.update:
Expand Down Expand Up @@ -780,13 +779,13 @@ func (s *Syncer) cleanStorageTasks() {

// assignAccountTasks attempts to match idle peers to pending account range
// retrievals.
func (s *Syncer) assignAccountTasks(cancel chan struct{}) int {
func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
assignments := 0

// If there are no idle peers, short circuit assignment
if len(s.accountIdlers) == 0 {
return assignments
return
}
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
Expand All @@ -808,7 +807,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) int {
break
}
if idle == "" {
return assignments
return
}
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
Expand Down Expand Up @@ -840,7 +839,6 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) int {
delete(s.accountIdlers, idle)

s.pend.Add(1)
assignments++
go func(peer SyncPeer, root common.Hash) {
defer s.pend.Done()

Expand All @@ -854,17 +852,16 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) int {
// Inject the request into the task to block further assignments
task.req = req
}
return assignments
}

// assignBytecodeTasks attempts to match idle peers to pending code retrievals.
func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) int {
func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
var assignments = 0

// If there are no idle peers, short circuit assignment
if len(s.bytecodeIdlers) == 0 {
return assignments
return
}
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
Expand All @@ -890,7 +887,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) int {
break
}
if idle == "" {
return assignments
return
}
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
Expand All @@ -913,7 +910,6 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) int {
break
}
}
assignments++
req := &bytecodeRequest{
peer: idle,
id: reqid,
Expand All @@ -940,18 +936,17 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) int {
}
}(s.peers[idle]) // We're in the lock, peers[id] surely exists
}
return assignments
}

// assignStorageTasks attempts to match idle peers to pending storage range
// retrievals.
func (s *Syncer) assignStorageTasks(cancel chan struct{}) int {
func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
var assignments = 0

// If there are no idle peers, short circuit assignment
if len(s.storageIdlers) == 0 {
return assignments
return
}
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
Expand All @@ -977,7 +972,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) int {
break
}
if idle == "" {
return assignments
return
}
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
Expand Down Expand Up @@ -1033,7 +1028,6 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) int {
if len(accounts) == 0 {
continue
}
assignments++
req := &storageRequest{
peer: idle,
id: reqid,
Expand Down Expand Up @@ -1075,18 +1069,17 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) int {
subtask.req = req
}
}
return assignments
}

// assignTrienodeHealTasks attempts to match idle peers to trie node requests to
// heal any trie errors caused by the snap sync's chunked retrieval model.
func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) int {
func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
var assignments = 0

// If there are no idle peers, short circuit assignment
if len(s.trienodeHealIdlers) == 0 {
return assignments
return
}
// Iterate over pending tasks and try to find a peer to retrieve with
for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
Expand All @@ -1108,7 +1101,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) int {
}
// If all the heal tasks are bytecodes or already downloading, bail
if len(s.healer.trieTasks) == 0 {
return assignments
return
}
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
Expand All @@ -1124,7 +1117,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) int {
break
}
if idle == "" {
return assignments
return
}
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
Expand Down Expand Up @@ -1155,7 +1148,6 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) int {
break
}
}
assignments++
req := &trienodeHealRequest{
peer: idle,
id: reqid,
Expand Down Expand Up @@ -1183,18 +1175,17 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) int {
}
}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
}
return assignments
}

// assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
// heal any trie errors caused by the snap sync's chunked retrieval model.
func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) int {
func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
var assignments = 0

// If there are no idle peers, short circuit assignment
if len(s.bytecodeHealIdlers) == 0 {
return assignments
return
}
// Iterate over pending tasks and try to find a peer to retrieve with
for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
Expand All @@ -1216,7 +1207,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) int {
}
// If all the heal tasks are trienodes or already downloading, bail
if len(s.healer.codeTasks) == 0 {
return assignments
return
}
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
Expand All @@ -1232,7 +1223,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) int {
break
}
if idle == "" {
return assignments
return
}
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
Expand All @@ -1256,7 +1247,6 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) int {
break
}
}
assignments++
req := &bytecodeHealRequest{
peer: idle,
id: reqid,
Expand All @@ -1283,7 +1273,6 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) int {
}
}(s.peers[idle]) // We're in the lock, peers[id] surely exists
}
return assignments
}

// revertRequests locates all the currently pending reuqests from a particular
Expand Down

0 comments on commit 88dc6ee

Please sign in to comment.