Skip to content

Commit

Permalink
feat: avoid report peer result fail due to context cancel & add backs…
Browse files Browse the repository at this point in the history
…ource tracer (#606)

Signed-off-by: santong <244372610@qq.com>
  • Loading branch information
244372610 authored Sep 6, 2021
1 parent 4be75cc commit 61e4969
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 26 deletions.
29 changes: 16 additions & 13 deletions client/config/constants_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@ const (
AttributeGetPieceRetry = attribute.Key("d7y.peer.piece.retry")
AttributeWritePieceSuccess = attribute.Key("d7y.peer.piece.write.success")

SpanFilePeerTask = "file-peer-task"
SpanStreamPeerTask = "stream-peer-task"
SpanReusePeerTask = "reuse-peer-task"
SpanRegisterTask = "register"
SpanFirstSchedule = "schedule-#1"
SpanGetPieceTasks = "get-piece-tasks"
SpanDownloadPiece = "download-piece-#%d"
SpanProxy = "proxy"
SpanWritePiece = "write-piece"
SpanWriteBackPiece = "write-back-piece"
SpanWaitPieceLimit = "wait-limit"
SpanPushPieceResult = "push-result"
SpanPeerGC = "peer-gc"
SpanFilePeerTask = "file-peer-task"
SpanStreamPeerTask = "stream-peer-task"
SpanReusePeerTask = "reuse-peer-task"
SpanRegisterTask = "register"
SpanReportPeerResult = "report-peer-result"
SpanReportPieceResult = "report-piece-result"
SpanBackSource = "client-back-source"
SpanFirstSchedule = "schedule-#1"
SpanGetPieceTasks = "get-piece-tasks"
SpanDownloadPiece = "download-piece-#%d"
SpanProxy = "proxy"
SpanWritePiece = "write-piece"
SpanWriteBackPiece = "write-back-piece"
SpanWaitPieceLimit = "wait-limit"
SpanPushPieceResult = "push-result"
SpanPeerGC = "peer-gc"
)
19 changes: 13 additions & 6 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func (pt *filePeerTask) cleanUnfinished() {
scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("clean up end piece result sent")

if err := pt.callback.Fail(pt, pt.failedCode, pt.failedReason); err != nil {
pt.span.RecordError(err)
pt.Errorf("peer task fail callback failed: %s", err)
}

var progressDone bool
pg := &FilePeerTaskProgress{
State: &ProgressState{
Expand Down Expand Up @@ -407,11 +412,6 @@ func (pt *filePeerTask) cleanUnfinished() {
}
}

if err := pt.callback.Fail(pt, pt.failedCode, pt.failedReason); err != nil {
pt.span.RecordError(err)
pt.Errorf("peer task fail callback failed: %s", err)
}

pt.Debugf("clean unfinished: close channel")
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
Expand All @@ -436,14 +436,18 @@ func (pt *filePeerTask) SetTotalPieces(i int32) {
}

func (pt *filePeerTask) backSource() {
backSourceCtx, backSourceSpan := tracer.Start(pt.ctx, config.SpanBackSource)
defer backSourceSpan.End()
defer pt.cleanUnfinished()
if pt.disableBackSource {
pt.Errorf(reasonBackSourceDisabled)
pt.failedReason = reasonBackSourceDisabled
return
}
_ = pt.callback.Init(pt)
if peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.taskID, pt.request); err != nil {
reportPieceCtx, reportPieceSpan := tracer.Start(backSourceCtx, config.SpanReportPieceResult)
defer reportPieceSpan.End()
if peerPacketStream, err := pt.schedulerClient.ReportPieceResult(reportPieceCtx, pt.taskID, pt.request); err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", pt.request.PeerId, err)
} else {
pt.peerPacketStream = peerPacketStream
Expand All @@ -453,8 +457,11 @@ func (pt *filePeerTask) backSource() {
if err != nil {
pt.Errorf("download from source error: %s", err)
pt.failedReason = err.Error()
backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
backSourceSpan.RecordError(err)
return
}
pt.Infof("download from source ok")
backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
_ = pt.finish()
}
11 changes: 9 additions & 2 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package peer
import (
"time"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/pkg/rpc/base"
Expand Down Expand Up @@ -91,7 +92,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -106,6 +109,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
peerResultSpan.RecordError(err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("step 3: report successful peer result ok")
Expand All @@ -117,7 +121,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -132,6 +138,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
Code: code,
})
if err != nil {
peerResultSpan.RecordError(err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("step 3: report fail peer result ok")
Expand Down
13 changes: 10 additions & 3 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,13 @@ func (s *streamPeerTask) cleanUnfinished() {
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Errorf("end piece result sent, peer task failed")
close(s.streamDone)
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Fail(s, s.failedCode, s.failedReason); err != nil {
s.span.RecordError(err)
s.Errorf("fail callback error: %s", err)
}
close(s.streamDone)
close(s.done)
s.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
s.span.SetAttributes(config.AttributePeerTaskCode.Int(int(s.failedCode)))
s.span.SetAttributes(config.AttributePeerTaskMessage.String(s.failedReason))
Expand Down Expand Up @@ -443,9 +443,13 @@ func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) {
}

func (s *streamPeerTask) backSource() {
backSourceCtx, backSourceSpan := tracer.Start(s.ctx, config.SpanBackSource)
defer backSourceSpan.End()
s.contentLength.Store(-1)
_ = s.callback.Init(s)
if peerPacketStream, err := s.schedulerClient.ReportPieceResult(s.ctx, s.taskID, s.request); err != nil {
reportPieceCtx, reportPieceSpan := tracer.Start(backSourceCtx, config.SpanReportPieceResult)
defer reportPieceSpan.End()
if peerPacketStream, err := s.schedulerClient.ReportPieceResult(reportPieceCtx, s.taskID, s.request); err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", s.request.PeerId, err)
} else {
s.peerPacketStream = peerPacketStream
Expand All @@ -456,9 +460,12 @@ func (s *streamPeerTask) backSource() {
s.Errorf("download from source error: %s", err)
s.failedReason = err.Error()
s.cleanUnfinished()
backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
backSourceSpan.RecordError(err)
return
}
s.Debugf("download from source ok")
backSourceSpan.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
_ = s.finish()
return
}
11 changes: 9 additions & 2 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package peer
import (
"time"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/pkg/rpc/base"
Expand Down Expand Up @@ -89,7 +90,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -104,6 +107,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
peerResultSpan.RecordError(err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("step 3: report successful peer result ok")
Expand All @@ -115,7 +119,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
peerResultCtx, peerResultSpan := tracer.Start(p.pt.ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
err := p.pt.schedulerClient.ReportPeerResult(peerResultCtx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -130,6 +136,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
Code: code,
})
if err != nil {
peerResultSpan.RecordError(err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("step 3: report fail peer result ok")
Expand Down

0 comments on commit 61e4969

Please sign in to comment.