diff --git a/unixfs/hamt/fetcher.go b/unixfs/hamt/fetcher.go index 49ca75cd115..c94ca3dca4b 100644 --- a/unixfs/hamt/fetcher.go +++ b/unixfs/hamt/fetcher.go @@ -2,6 +2,7 @@ package hamt import ( "context" + "time" //"fmt" //"os" @@ -29,7 +30,7 @@ type fetcher struct { idle bool - done chan int + done chan batchJob todoFirst *job // do this job first since we are waiting for its results todo jobStack // stack of jobs that still need to be done @@ -43,6 +44,8 @@ type fetcher struct { // other useful stats cidCnt int + + start time.Time } // batchSize must be at least as large as the largest number of cids @@ -65,7 +68,7 @@ func startFetcher(ctx context.Context, dserv ipld.DAGService) *fetcher { reqRes: make(chan *Shard), result: make(chan result), idle: true, - done: make(chan int), + done: make(chan batchJob), jobs: make(map[*Shard]*job), } go f.mainLoop() @@ -109,6 +112,7 @@ type jobStack struct { func (f *fetcher) mainLoop() { var want *Shard + f.start = time.Now() for { select { case id := <-f.reqRes: @@ -142,8 +146,9 @@ func (f *fetcher) mainLoop() { } want = id } - case cnt := <-f.done: - f.doneCnt += cnt + case bj := <-f.done: + f.doneCnt += len(bj.jobs) + f.cidCnt += len(bj.cids) f.launch() if want != nil { j := f.jobs[want] @@ -153,11 +158,10 @@ func (f *fetcher) mainLoop() { } } log.Infof("fetcher: batch job done") - log.Infof("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d", f.doneCnt, f.hits, f.nearMisses, f.misses) + f.mainLoopLogStats() case <-f.ctx.Done(): log.Infof("fetcher: exiting") - log.Infof("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d", f.doneCnt, f.hits, f.nearMisses, f.misses) - log.Infof("fetcher total number of CIDs retrieved: %d", f.cidCnt) + f.mainLoopLogStats() return } } @@ -173,7 +177,6 @@ func (f *fetcher) mainLoopAddJob(hamt *Shard) *job { // programmer error panic("job size larger than batchSize") } - f.cidCnt += len(j.cids) f.todo.push(j) f.jobs[j.id] = j return j @@ -197,6 +200,12 @@ func (f *fetcher) mainLoopSendResult(j *job) { } } +func (f *fetcher) mainLoopLogStats() { + log.Infof("fetcher stats (cids, done, hits, nearMisses, misses): %d %d %d %d %d", f.cidCnt, f.doneCnt, f.hits, f.nearMisses, f.misses) + elapsed := time.Now().Sub(f.start).Seconds() + log.Infof("fetcher perf (cids/sec, jobs/sec) %f %f", float64(f.cidCnt)/elapsed, float64(f.doneCnt+f.hits+f.nearMisses+f.misses)/elapsed) +} + type batchJob struct { cids []*cid.Cid jobs []*job @@ -252,7 +261,7 @@ func (f *fetcher) launch() { for _, job := range bj.jobs { job.res = fetched } - f.done <- len(bj.jobs) + f.done <- bj }() }