diff --git a/client/config/constants_otel.go b/client/config/constants_otel.go index 9f1329393f9..538eeb22fdb 100644 --- a/client/config/constants_otel.go +++ b/client/config/constants_otel.go @@ -44,17 +44,20 @@ const ( AttributeGetPieceRetry = attribute.Key("d7y.peer.piece.retry") AttributeWritePieceSuccess = attribute.Key("d7y.peer.piece.write.success") - SpanFilePeerTask = "file-peer-task" - SpanStreamPeerTask = "stream-peer-task" - SpanReusePeerTask = "reuse-peer-task" - SpanRegisterTask = "register" - SpanFirstSchedule = "schedule-#1" - SpanGetPieceTasks = "get-piece-tasks" - SpanDownloadPiece = "download-piece-#%d" - SpanProxy = "proxy" - SpanWritePiece = "write-piece" - SpanWriteBackPiece = "write-back-piece" - SpanWaitPieceLimit = "wait-limit" - SpanPushPieceResult = "push-result" - SpanPeerGC = "peer-gc" + SpanFilePeerTask = "file-peer-task" + SpanStreamPeerTask = "stream-peer-task" + SpanReusePeerTask = "reuse-peer-task" + SpanRegisterTask = "register" + SpanReportPeerResult = "report-peer-result" + SpanReportPieceResult = "report-piece-result" + SpanBackSource = "client-back-source" + SpanFirstSchedule = "schedule-#1" + SpanGetPieceTasks = "get-piece-tasks" + SpanDownloadPiece = "download-piece-#%d" + SpanProxy = "proxy" + SpanWritePiece = "write-piece" + SpanWriteBackPiece = "write-back-piece" + SpanWaitPieceLimit = "wait-limit" + SpanPushPieceResult = "push-result" + SpanPeerGC = "peer-gc" ) diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 2ee6bdba7d5..e90069177eb 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -368,6 +368,11 @@ func (pt *filePeerTask) cleanUnfinished() { scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled())) pt.Debugf("clean up end piece result sent") + if err := pt.callback.Fail(pt, pt.failedCode, pt.failedReason); err != nil { + pt.span.RecordError(err) + pt.Errorf("peer task fail callback failed: %s", err) + } + var progressDone bool pg := &FilePeerTaskProgress{ State: &ProgressState{ @@ -407,11 +412,6 @@ func (pt *filePeerTask) cleanUnfinished() { } } - if err := pt.callback.Fail(pt, pt.failedCode, pt.failedReason); err != nil { - pt.span.RecordError(err) - pt.Errorf("peer task fail callback failed: %s", err) - } - pt.Debugf("clean unfinished: close channel") close(pt.done) pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) @@ -436,6 +436,8 @@ func (pt *filePeerTask) SetTotalPieces(i int32) { } func (pt *filePeerTask) backSource() { + backSourceCtx, backSourceSpan := tracer.Start(pt.ctx, config.SpanBackSource) + defer backSourceSpan.End() defer pt.cleanUnfinished() if pt.disableBackSource { pt.Errorf(reasonBackSourceDisabled) @@ -443,7 +445,9 @@ func (pt *filePeerTask) backSource() { return } _ = pt.callback.Init(pt) - if peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.taskID, pt.request); err != nil { + reportPieceCtx, reportPieceSpan := tracer.Start(backSourceCtx, config.SpanReportPieceResult) + defer reportPieceSpan.End() + if peerPacketStream, err := pt.schedulerClient.ReportPieceResult(reportPieceCtx, pt.taskID, pt.request); err != nil { logger.Errorf("step 2: peer %s report piece failed: err", pt.request.PeerId, err) } else { pt.peerPacketStream = peerPacketStream @@ -453,8 +457,11 @@ func (pt *filePeerTask) backSource() { if err != nil { pt.Errorf("download from source error: %s", err) pt.failedReason = err.Error() + backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) + backSourceSpan.RecordError(err) return } pt.Infof("download from source ok") + backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) _ = pt.finish() } diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index cd07cd9cd9f..d69e17fb179 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -19,6 +19,7 @@ package peer import ( "time" + "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -91,7 +92,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error { return e } p.ptm.PeerTaskDone(p.req.PeerId) - err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ + peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult) + defer peerResultSpan.End() + err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -106,6 +109,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { + peerResultSpan.RecordError(err) pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { pt.Log().Infof("step 3: report successful peer result ok") @@ -117,7 +121,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason) - err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ + peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult) + defer peerResultSpan.End() + err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -132,6 +138,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro Code: code, }) if err != nil { + peerResultSpan.RecordError(err) pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { pt.Log().Infof("step 3: report fail peer result ok") diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index e062a589c40..d6868b7cf7c 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -396,13 +396,13 @@ func (s *streamPeerTask) cleanUnfinished() { _ = s.peerPacketStream.Send( scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled())) s.Errorf("end piece result sent, peer task failed") - close(s.streamDone) - close(s.done) //close(s.successPieceCh) if err := s.callback.Fail(s, s.failedCode, s.failedReason); err != nil { s.span.RecordError(err) s.Errorf("fail callback error: %s", err) } + close(s.streamDone) + close(s.done) s.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) s.span.SetAttributes(config.AttributePeerTaskCode.Int(int(s.failedCode))) s.span.SetAttributes(config.AttributePeerTaskMessage.String(s.failedReason)) @@ -443,9 +443,13 @@ func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) { } func (s *streamPeerTask) backSource() { + backSourceCtx, backSourceSpan := tracer.Start(s.ctx, config.SpanBackSource) + defer backSourceSpan.End() s.contentLength.Store(-1) _ = s.callback.Init(s) - if peerPacketStream, err := s.schedulerClient.ReportPieceResult(s.ctx, s.taskID, s.request); err != nil { + reportPieceCtx, reportPieceSpan := tracer.Start(backSourceCtx, config.SpanReportPieceResult) + defer reportPieceSpan.End() + if peerPacketStream, err := s.schedulerClient.ReportPieceResult(reportPieceCtx, s.taskID, s.request); err != nil { logger.Errorf("step 2: peer %s report piece failed: err", s.request.PeerId, err) } else { s.peerPacketStream = peerPacketStream @@ -456,9 +460,12 @@ func (s *streamPeerTask) backSource() { s.Errorf("download from source error: %s", err) s.failedReason = err.Error() s.cleanUnfinished() + backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) + backSourceSpan.RecordError(err) return } s.Debugf("download from source ok") + backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) _ = s.finish() return } diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index c30817060c5..4ccb4aa295d 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -19,6 +19,7 @@ package peer import ( "time" + "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -89,7 +90,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { return e } p.ptm.PeerTaskDone(p.req.PeerId) - err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ + peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult) + defer peerResultSpan.End() + err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -104,6 +107,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { + peerResultSpan.RecordError(err) pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { pt.Log().Infof("step 3: report successful peer result ok") @@ -115,7 +119,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason) - err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ + peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult) + defer peerResultSpan.End() + err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -130,6 +136,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er Code: code, }) if err != nil { + peerResultSpan.RecordError(err) pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { pt.Log().Infof("step 3: report fail peer result ok")