Skip to content

Commit

Permalink
refactor: contentLookup and traceContentLookup
Browse files Browse the repository at this point in the history
  • Loading branch information
qcloud authored and GrapeBaBa committed Jun 3, 2024
1 parent 1ae723d commit 04c5048
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 122 deletions.
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,6 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581 h1:ZxgrtI0xIw+clB32iDDDWaiTcCizTeN7rNyzH9YorPI=
github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/utp-go v0.0.0-20240530085325-d8dd9d262631 h1:01AecSuOSS6fsIU/oTVG/C70hIl3xPen99qy2hGr57w=
github.com/optimism-java/utp-go v0.0.0-20240530085325-d8dd9d262631/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/utp-go v0.0.0-20240531021243-e12d25b6be38 h1:t0gRqfM7wUrFyryagUpw4TmYY0DLt+rjPaBd92i+W2M=
github.com/optimism-java/utp-go v0.0.0-20240531021243-e12d25b6be38/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/utp-go v0.0.0-20240531024756-00da67044c50 h1:I1jGQkNEWq7BTFZkCJKLDrqFLC1jR3EC7jz3to4kpLg=
github.com/optimism-java/utp-go v0.0.0-20240531024756-00da67044c50/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ=
github.com/optimism-java/utp-go v0.0.0-20240603010819-75be99daf402 h1:jssfGQq6xdzgs0ZI/O/S2dKAyh2fIDiOTWdrbqat1Ls=
Expand Down Expand Up @@ -715,8 +709,6 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
7 changes: 6 additions & 1 deletion p2p/discover/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Trace struct {
Origin string `json:"origin"` // local node id
TargetId string `json:"targetId"` // target content id
ReceivedFrom string `json:"receivedFrom"` // the node id of which content from
Responses map[string][]string `json:"responses"` // the node id and there response nodeIds
Responses map[string]RespByNode `json:"responses"` // the node id and there response nodeIds
Metadata map[string]*NodeMetadata `json:"metadata"` // node id and there metadata object
StartedAtMs int `json:"startedAtMs"` // timestamp of the beginning of this request in milliseconds
Cancelled []string `json:"cancelled"` // the node ids which are send but cancelled
Expand All @@ -67,6 +67,11 @@ type NodeMetadata struct {
Distance string `json:"distance"`
}

type RespByNode struct {
DurationMs int32 `json:"durationMs"`
RespondedWith []string `json:"respondedWith"`
}

type Enrs struct {
Enrs []string `json:"enrs"`
}
Expand Down
193 changes: 87 additions & 106 deletions p2p/discover/portal_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -1488,64 +1489,44 @@ func (p *PortalProtocol) collectTableNodes(rip net.IP, distances []uint, limit i

func (p *PortalProtocol) ContentLookup(contentKey, contentId []byte) ([]byte, bool, error) {
lookupContext, cancel := context.WithCancel(context.Background())
defer cancel()
resChan := make(chan *ContentInfoResp, 1)
defer close(resChan)

resChan := make(chan *traceContentInfoResp, alpha)
hasResult := int32(0)

result := ContentInfoResp{}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
for res := range resChan {
if res.Flag != portalwire.ContentEnrsSelector {
result.Content = res.Content.([]byte)
result.UtpTransfer = res.UtpTransfer
}
}
}()

newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) {
return p.contentLookupWorker(unwrapNode(n), contentKey, resChan)
return p.contentLookupWorker(unwrapNode(n), contentKey, resChan, cancel, &hasResult)
}).run()
close(resChan)

if len(resChan) > 0 {
res := <-resChan
return res.Content, res.UtpTransfer, nil
wg.Wait()
if hasResult == 1 {
return result.Content, result.UtpTransfer, nil
}
defer cancel()
return nil, false, ContentNotFound
}

func (p *PortalProtocol) contentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *ContentInfoResp) ([]*node, error) {
wrapedNode := make([]*node, 0)
flag, content, err := p.findContent(n, contentKey)
if err != nil {
p.Log.Error("contentLookupWorker failed", "ip", n.IP().String(), "err", err)
return nil, err
}
p.Log.Debug("contentLookupWorker reveice response", "ip", n.IP().String(), "flag", flag)
// has find content
if len(resChan) > 0 {
return []*node{}, nil
}
switch flag {
case portalwire.ContentRawSelector, portalwire.ContentConnIdSelector:
content, ok := content.([]byte)
if !ok {
return wrapedNode, fmt.Errorf("failed to assert to raw content, value is: %v", content)
}
res := &ContentInfoResp{
Content: content,
}
if flag == portalwire.ContentConnIdSelector {
res.UtpTransfer = true
}
resChan <- res
return wrapedNode, err
case portalwire.ContentEnrsSelector:
nodes, ok := content.([]*enode.Node)
if !ok {
return wrapedNode, fmt.Errorf("failed to assert to enrs content, value is: %v", content)
}
return wrapNodes(nodes), nil
}
return wrapedNode, nil
}

func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*TraceContentResult, error) {
lookupContext, cancel := context.WithCancel(context.Background())
defer cancel()
requestNodeChan := make(chan *enode.Node, 3)
resChan := make(chan *traceContentInfoResp, 3)
// resp channel
resChan := make(chan *traceContentInfoResp, alpha)

requestNode := make([]*enode.Node, 0)
requestRes := make(map[string]*traceContentInfoResp)
hasResult := int32(0)

traceContentRes := &TraceContentResult{}

Expand All @@ -1555,7 +1536,7 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac
Origin: selfHexId,
TargetId: hexutil.Encode(contentId),
StartedAtMs: int(time.Now().UnixMilli()),
Responses: make(map[string][]string),
Responses: make(map[string]RespByNode),
Metadata: make(map[string]*NodeMetadata),
Cancelled: make([]string, 0),
}
Expand All @@ -1567,7 +1548,10 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac
id := "0x" + node.ID().String()
localResponse = append(localResponse, id)
}
trace.Responses[selfHexId] = localResponse
trace.Responses[selfHexId] = RespByNode{
DurationMs: 0,
RespondedWith: localResponse,
}

dis := p.Distance(p.Self().ID(), enode.ID(contentId))

Expand All @@ -1577,82 +1561,73 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac
}

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for node := range requestNodeChan {
requestNode = append(requestNode, node)
}
}()
wg.Add(1)

go func() {
defer wg.Done()
for res := range resChan {
key := res.Node.ID().String()
requestRes[key] = res
if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector {
// get the content
return
node := res.Node
hexId := "0x" + node.ID().String()
dis := p.Distance(node.ID(), enode.ID(contentId))
p.Log.Debug("reveice res", "id", hexId, "flag", res.Flag)
trace.Metadata[hexId] = &NodeMetadata{
Enr: node.String(),
Distance: hexutil.Encode(dis[:]),
}
// 没有返回 content
if traceContentRes.Content == "" {
if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector {
trace.ReceivedFrom = hexId
content := res.Content.([]byte)
traceContentRes.Content = hexutil.Encode(content)
traceContentRes.UtpTransfer = res.UtpTransfer
trace.Responses[hexId] = RespByNode{}
} else {
nodes := res.Content.([]*enode.Node)
respByNode := RespByNode{
RespondedWith: make([]string, 0, len(nodes)),
}
for _, node := range nodes {
idInner := "0x" + node.ID().String()
respByNode.RespondedWith = append(respByNode.RespondedWith, idInner)
if _, ok := trace.Metadata[idInner]; !ok {
dis := p.Distance(node.ID(), enode.ID(contentId))
trace.Metadata[idInner] = &NodeMetadata{
Enr: node.String(),
Distance: hexutil.Encode(dis[:]),
}
}
trace.Responses[hexId] = respByNode
}
}
} else {
trace.Cancelled = append(trace.Cancelled, hexId)
}
}
}()

newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) {
node := unwrapNode(n)
requestNodeChan <- node
return p.traceContentLookupWorker(node, contentKey, resChan)
}).run()

close(requestNodeChan)
lookup := newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) {
return p.contentLookupWorker(unwrapNode(n), contentKey, resChan, cancel, &hasResult)
})
lookup.run()
close(resChan)

wg.Wait()

for _, node := range requestNode {
id := node.ID().String()
hexId := "0x" + id
dis := p.Distance(node.ID(), enode.ID(contentId))
trace.Metadata[hexId] = &NodeMetadata{
Enr: node.String(),
Distance: hexutil.Encode(dis[:]),
}
if res, ok := requestRes[id]; ok {
if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector {
trace.ReceivedFrom = hexId
content := res.Content.([]byte)
traceContentRes.Content = hexutil.Encode(content)
traceContentRes.UtpTransfer = res.UtpTransfer
trace.Responses[hexId] = make([]string, 0)
} else {
content := res.Content.([]*enode.Node)
ids := make([]string, 0)
for _, n := range content {
hexId := "0x" + n.ID().String()
ids = append(ids, hexId)
}
trace.Responses[hexId] = ids
}
} else {
trace.Cancelled = append(trace.Cancelled, id)
}
if hasResult == 0 {
cancel()
}

traceContentRes.Trace = *trace

return traceContentRes, nil
}

func (p *PortalProtocol) traceContentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *traceContentInfoResp) ([]*node, error) {
func (p *PortalProtocol) contentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *traceContentInfoResp, cancel context.CancelFunc, done *int32) ([]*node, error) {
wrapedNode := make([]*node, 0)
flag, content, err := p.findContent(n, contentKey)
if err != nil {
return nil, err
}
p.Log.Debug("traceContentLookupWorker reveice response", "ip", n.IP().String(), "flag", flag)
// has find content
if len(resChan) > 0 {
return []*node{}, nil
}

switch flag {
case portalwire.ContentRawSelector, portalwire.ContentConnIdSelector:
Expand All @@ -1669,17 +1644,23 @@ func (p *PortalProtocol) traceContentLookupWorker(n *enode.Node, contentKey []by
if flag == portalwire.ContentConnIdSelector {
res.UtpTransfer = true
}
resChan <- res
if atomic.CompareAndSwapInt32(done, 0, 1) {
p.Log.Debug("contentLookupWorker find content", "ip", n.IP().String(), "port", n.UDP())
resChan <- res
cancel()
}
return wrapedNode, err
case portalwire.ContentEnrsSelector:
nodes, ok := content.([]*enode.Node)
if !ok {
return wrapedNode, fmt.Errorf("failed to assert to enrs content, value is: %v", content)
}
resChan <- &traceContentInfoResp{Node: n,
resChan <- &traceContentInfoResp{
Node: n,
Flag: flag,
Content: content,
UtpTransfer: false}
UtpTransfer: false,
}
return wrapNodes(nodes), nil
}
return wrapedNode, nil
Expand Down
14 changes: 7 additions & 7 deletions p2p/discover/portal_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ func TestTraceContentLookup(t *testing.T) {
err = node3.Start()
assert.NoError(t, err)

defer node1.Stop()
defer node2.Stop()
defer node3.Stop()

contentKey := []byte{0x3, 0x4}
content := []byte{0x1, 0x2}
contentId := node1.toContentId(contentKey)
Expand Down Expand Up @@ -495,15 +499,11 @@ func TestTraceContentLookup(t *testing.T) {

// check response
node3Response := res.Trace.Responses[node3Id]
assert.Equal(t, node3Response, []string{node2Id})
assert.Equal(t, node3Response.RespondedWith, []string{node2Id})

node2Response := res.Trace.Responses[node2Id]
assert.Equal(t, node2Response, []string{node1Id})
assert.Equal(t, node2Response.RespondedWith, []string{node1Id})

node1Response := res.Trace.Responses[node1Id]
assert.Equal(t, node1Response, []string{})

// res, _, err = node1.ContentLookup([]byte{0x2, 0x4})
// assert.Equal(t, ContentNotFound, err)
// assert.Nil(t, res)
assert.Equal(t, node1Response.RespondedWith, ([]string)(nil))
}

0 comments on commit 04c5048

Please sign in to comment.