Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(requestmanager): rename processResponses internals for consistency #328

Merged
merged 3 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestRejectRequestsByDefault(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"responseMessage(0)->loaderProcess(0)->cacheProcess(0)",
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
}, tracing.TracesToStrings())
// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -254,8 +254,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrBudgetExceeded exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true)
Expand Down Expand Up @@ -303,8 +303,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
// the requester gets a cancel, the responder gets a ErrBudgetExceeded
Expand Down Expand Up @@ -387,8 +387,8 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
Expand Down Expand Up @@ -483,8 +483,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -715,8 +715,8 @@ func TestPauseResume(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -796,8 +796,8 @@ func TestPauseResumeRequest(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(1)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrPaused exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -879,7 +879,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
Expand Down Expand Up @@ -970,7 +970,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
Expand Down Expand Up @@ -1063,8 +1063,8 @@ func TestNetworkDisconnect(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -1203,8 +1203,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)

// TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane
// tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true)
Expand Down Expand Up @@ -1291,8 +1291,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -1485,8 +1485,8 @@ func TestUnixFSFetch(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncBlockListeners(t *testing.T) {
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 100)...,
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -1724,12 +1724,12 @@ func (r *receiver) Connected(p peer.ID) {
func (r *receiver) Disconnected(p peer.ID) {
}

func responseMessageTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("processResponses({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 {
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
}
2 changes: 1 addition & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync
// and updates the in progress requests based on those responses.
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {
rm.send(&processResponseMessage{p, responses, blks}, nil)
rm.send(&processResponsesMessage{p, responses, blks}, nil)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
Expand Down
6 changes: 3 additions & 3 deletions requestmanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
}
}

type processResponseMessage struct {
type processResponsesMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
rm.processResponseMessage(prm.p, prm.responses, prm.blks)
func (prm *processResponsesMessage) handle(rm *RequestManager) {
rm.processResponses(prm.p, prm.responses, prm.blks)
}

type cancelRequestMessage struct {
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,13 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
}
}

func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) {
log.Debugf("beging rocessing message for peer %s", p)
func (rm *RequestManager) processResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) {
log.Debugf("beginning processing responses for peer %s", p)
requestIds := make([]int, 0, len(responses))
for _, r := range responses {
requestIds = append(requestIds, int(r.RequestID()))
}
ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "responseMessage", trace.WithAttributes(
ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "processResponses", trace.WithAttributes(
attribute.String("peerID", p.Pretty()),
attribute.IntSlice("requestIDs", requestIds),
))
Expand All @@ -279,7 +279,7 @@ func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.Gr
responseMetadata := metadataForResponses(filteredResponses)
rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks)
rm.processTerminations(filteredResponses)
log.Debugf("end processing message for peer %s", p)
log.Debugf("end processing responses for peer %s", p)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
Expand Down