Skip to content

Commit

Permalink
Merge branch 'master' into gossipsub-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ppopth committed Jan 16, 2025
2 parents 409123d + bf5b583 commit d5f588f
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 1 deletion.
22 changes: 21 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
GossipSubGraftFloodThreshold = 10 * time.Second
GossipSubMaxIHaveLength = 5000
GossipSubMaxIHaveMessages = 10
GossipSubMaxIDontWantLength = 10
GossipSubMaxIDontWantMessages = 1000
GossipSubIWantFollowupTime = 3 * time.Second
GossipSubIDontWantMessageThreshold = 1024 // 1KB
Expand Down Expand Up @@ -235,6 +236,10 @@ type GossipSubParams struct {
// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
MaxIHaveMessages int

// MaxIDontWantLength is the maximum number of messages to include in an IDONTWANT message. Also controls
// the maximum number of IDONTWANT ids we will accept to protect against IDONTWANT floods. This value
// should be adjusted if your system anticipates a larger amount than specified per heartbeat.
MaxIDontWantLength int
// MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
MaxIDontWantMessages int

Expand Down Expand Up @@ -323,6 +328,7 @@ func DefaultGossipSubParams() GossipSubParams {
GraftFloodThreshold: GossipSubGraftFloodThreshold,
MaxIHaveLength: GossipSubMaxIHaveLength,
MaxIHaveMessages: GossipSubMaxIHaveMessages,
MaxIDontWantLength: GossipSubMaxIDontWantLength,
MaxIDontWantMessages: GossipSubMaxIDontWantMessages,
IWantFollowupTime: GossipSubIWantFollowupTime,
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
Expand Down Expand Up @@ -879,6 +885,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
ihave := make(map[string]*pb.Message)
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
// Check if that peer has sent IDONTWANT before, if so don't send them the message
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
continue
}

msg, count, ok := gs.mcache.GetForPeer(mid, p)
if !ok {
continue
Expand Down Expand Up @@ -1055,9 +1066,18 @@ func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
}
gs.peerdontwant[p]++

totalUnwantedIds := 0
// Remember all the unwanted message ids
mainIDWLoop:
for _, idontwant := range ctl.GetIdontwant() {
for _, mid := range idontwant.GetMessageIDs() {
// IDONTWANT flood protection
if totalUnwantedIds >= gs.params.MaxIDontWantLength {
log.Debugf("IDONWANT: peer %s has advertised too many ids (%d) within this message; ignoring", p, totalUnwantedIds)
break mainIDWLoop
}

totalUnwantedIds++
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
}
}
Expand Down Expand Up @@ -1703,7 +1723,7 @@ func (gs *GossipSubRouter) heartbeat() {
}

// do we have too many peers?
if len(peers) > gs.params.Dhi {
if len(peers) >= gs.params.Dhi {
plst := peerMapToList(peers)

// sort by score (but shuffle first for the case we don't use the score)
Expand Down
48 changes: 48 additions & 0 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -904,6 +905,53 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) {
<-ctx.Done()
}

func TestGossipsubHandleIDontwantSpam(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 2)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))

connect(t, hosts[0], hosts[1])

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
exceededIDWLength := GossipSubMaxIDontWantLength + 1
var idwIds []string
for i := 0; i < exceededIDWLength; i++ {
idwIds = append(idwIds, fmt.Sprintf("idontwant-%d", i))
}
rPid := hosts[1].ID()
ctrlMessage := &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: idwIds}}}
grt := psubs[0].rt.(*GossipSubRouter)
grt.handleIDontWant(rPid, ctrlMessage)

if grt.peerdontwant[rPid] != 1 {
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
}
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
}

mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
}
}

type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)

func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {
Expand Down
151 changes: 151 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3090,6 +3090,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) {
<-ctx.Done()
}

// Test that IWANT will have no effect after IDONTWANT is sent
func TestGossipsubIdontwantBeforeIwant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 3)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Wait a bit after the last message before checking the result
msgWaitMax := 2 * time.Second
msgTimer := time.NewTimer(msgWaitMax)

// Checks we received right messages
msgReceived := false
ihaveReceived := false
checkMsgs := func() {
if msgReceived {
t.Fatalf("Expected no messages received after IDONWANT")
}
if !ihaveReceived {
t.Fatalf("Expected IHAVE received")
}
}

// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgs()
cancel()
return
case <-ctx.Done():
checkMsgs()
}
}()

newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// Check if it receives any message
if len(irpc.GetPublish()) > 0 {
msgReceived = true
}
// The middle peer is supposed to send IHAVE
for _, ihave := range irpc.GetControl().GetIhave() {
ihaveReceived = true
mids := ihave.GetMessageIDs()

writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}},
})
// Wait for the middle peer to process IDONTWANT
time.Sleep(100 * time.Millisecond)
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}},
})
}
// When the middle peer connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and pruning to the middle peer to make sure
// that it's not in the mesh
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}},
})

go func() {
// Wait for an interval to make sure the middle peer
// received and processed the subscribe
time.Sleep(100 * time.Millisecond)

data := make([]byte, 16)
crand.Read(data)

// Publish the message from the first peer
if err := psubs[0].Publish(topic, data); err != nil {
t.Error(err)
return // cannot call t.Fatal in a non-test goroutine
}
}()
}
}
})

connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])

<-ctx.Done()
}

// Test that IDONTWANT will cleared when it's old enough
func TestGossipsubIdontwantClear(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -3187,6 +3291,53 @@ func TestGossipsubIdontwantClear(t *testing.T) {
<-ctx.Done()
}

func TestGossipsubPruneMeshCorrectly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 9)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

params := DefaultGossipSubParams()
params.Dhi = 8

psubs := make([]*PubSub, 9)
for i := 0; i < 9; i++ {
psubs[i] = getGossipsub(ctx, hosts[i],
WithGossipSubParams(params),
WithMessageIdFn(msgID))
}

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Connect first peer with the rest of the 8 other
// peers.
for i := 1; i < 9; i++ {
connect(t, hosts[0], hosts[i])
}

// Wait for 2 heartbeats to be able to prune excess peers back down to D.
totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval
time.Sleep(totalTimeToWait)

meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic]
if !ok {
t.Fatal("mesh does not exist for topic")
}
if len(meshPeers) != params.D {
t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers))
}
}

// Test that IANNOUNCE is sent to mesh peers
func TestGossipsubIannounceMeshPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit d5f588f

Please sign in to comment.