From 676e557daf45f3b3244931f4568ce0582396db0c Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Fri, 16 Dec 2022 06:55:03 -0500 Subject: [PATCH] test: port peering test from sharness to Go This is the slowest test in the sharness test suite, because it has very long sleeps. It usually takes 2+ minutes to run. This new impl runs all peering tests in about 20 seconds, since it polls for conditions instead of sleeping, and runs the tests in parallel. This also has an additional test case for a peer that was never online and then connects. --- test/cli/harness/harness.go | 21 +++++ test/cli/harness/log.go | 155 +++++++++++++++++++++++++++++++++ test/cli/harness/node.go | 25 +++++- test/cli/harness/nodes.go | 20 +---- test/cli/peering_test.go | 141 ++++++++++++++++++++++++++++++ test/cli/testutils/strings.go | 14 +++ test/sharness/t0171-peering.sh | 127 --------------------------- 7 files changed, 358 insertions(+), 145 deletions(-) create mode 100644 test/cli/harness/log.go create mode 100644 test/cli/peering_test.go delete mode 100755 test/sharness/t0171-peering.sh diff --git a/test/cli/harness/harness.go b/test/cli/harness/harness.go index a35fead3512..e68116b5efc 100644 --- a/test/cli/harness/harness.go +++ b/test/cli/harness/harness.go @@ -11,6 +11,8 @@ import ( logging "github.com/ipfs/go-log/v2" . "github.com/ipfs/kubo/test/cli/testutils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" ) // Harness tracks state for a test, such as temp dirs and IFPS nodes, and cleans them up after the test. @@ -188,3 +190,22 @@ func (h *Harness) Cleanup() { log.Panicf("removing temp dir %s: %s", h.Dir, err) } } + +// ExtractPeerID extracts a peer ID from the given multiaddr, and fatals if it does not contain a peer ID. +func (h *Harness) ExtractPeerID(m multiaddr.Multiaddr) peer.ID { + var peerIDStr string + multiaddr.ForEach(m, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_P2P { + peerIDStr = c.Value() + } + return true + }) + if peerIDStr == "" { + panic(multiaddr.ErrProtocolNotFound) + } + peerID, err := peer.Decode(peerIDStr) + if err != nil { + panic(err) + } + return peerID +} diff --git a/test/cli/harness/log.go b/test/cli/harness/log.go new file mode 100644 index 00000000000..d76bb2747c1 --- /dev/null +++ b/test/cli/harness/log.go @@ -0,0 +1,155 @@ +package harness + +import ( + "fmt" + "path/filepath" + "runtime" + "sort" + "strings" + "sync" + "testing" + "time" +) + +type event struct { + timestamp time.Time + msg string +} + +type events []*event + +func (e events) Len() int { return len(e) } +func (e events) Less(i, j int) bool { return e[i].timestamp.Before(e[j].timestamp) } +func (e events) Swap(i, j int) { e[i], e[j] = e[j], e[i] } + +// TestLogger is a logger for tests. +// It buffers output and only writes the output if the test fails or output is explicitly turned on. +// The purpose of this logger is to allow Go test to run with the verbose flag without printing logs. +// The verbose flag is useful since it streams test progress, but also printing logs makes the output too verbose. +// +// You can also add prefixes that are prepended to each log message, for extra logging context. +// +// This is implemented as a hierarchy of loggers, with children flushing log entries back to parents. +// This works because t.Cleanup() processes entries in LIFO order, so children always flush first. +// +// Obviously this logger should never be used in production systems. +type TestLogger struct { + parent *TestLogger + children []*TestLogger + prefixes []string + prefixesIface []any + t *testing.T + buf events + m sync.Mutex + logsEnabled bool +} + +func NewTestLogger(t *testing.T) *TestLogger { + l := &TestLogger{t: t, buf: make(events, 0)} + t.Cleanup(l.flush) + return l +} + +func (t *TestLogger) buildPrefix(timestamp time.Time) string { + d := timestamp.Format("2006-01-02T15:04:05.999999") + _, file, lineno, _ := runtime.Caller(2) + file = filepath.Base(file) + caller := fmt.Sprintf("%s:%d", file, lineno) + + if len(t.prefixes) == 0 { + return fmt.Sprintf("%s\t%s\t", d, caller) + } + + prefixes := strings.Join(t.prefixes, ":") + return fmt.Sprintf("%s\t%s\t%s: ", d, caller, prefixes) +} + +func (t *TestLogger) Log(args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprint(args...) + t.add(&event{timestamp: timestamp, msg: e}) +} + +func (t *TestLogger) Logf(format string, args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprintf(format, args...) + t.add(&event{timestamp: timestamp, msg: e}) +} + +func (t *TestLogger) Fatal(args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprint(append([]any{"fatal: "}, args...)...) + t.add(&event{timestamp: timestamp, msg: e}) + t.t.FailNow() +} + +func (t *TestLogger) Fatalf(format string, args ...any) { + timestamp := time.Now() + e := t.buildPrefix(timestamp) + fmt.Sprintf(fmt.Sprintf("fatal: %s", format), args...) + t.add(&event{timestamp: timestamp, msg: e}) + t.t.FailNow() +} + +func (t *TestLogger) add(e *event) { + t.m.Lock() + defer t.m.Unlock() + t.buf = append(t.buf, e) +} + +func (t *TestLogger) AddPrefix(prefix string) *TestLogger { + l := &TestLogger{ + prefixes: append(t.prefixes, prefix), + prefixesIface: append(t.prefixesIface, prefix), + t: t.t, + parent: t, + logsEnabled: t.logsEnabled, + } + t.m.Lock() + defer t.m.Unlock() + + t.children = append(t.children, l) + t.t.Cleanup(l.flush) + + return l +} + +func (t *TestLogger) EnableLogs() { + t.m.Lock() + defer t.m.Unlock() + t.logsEnabled = true + if t.parent != nil { + if t.parent.logsEnabled { + t.parent.EnableLogs() + } + } + fmt.Printf("enabling %d children\n", len(t.children)) + for _, c := range t.children { + if !c.logsEnabled { + c.EnableLogs() + } + } +} + +func (t *TestLogger) flush() { + if t.t.Failed() || t.logsEnabled { + t.m.Lock() + defer t.m.Unlock() + // if this is a child, send the events to the parent + // the root parent will print all the events in sorted order + if t.parent != nil { + for _, e := range t.buf { + t.parent.add(e) + } + } else { + // we're the root, sort all the events and then print them + sort.Sort(t.buf) + fmt.Println() + fmt.Printf("Logs for test %q:\n\n", t.t.Name()) + for _, e := range t.buf { + fmt.Println(e.msg) + } + fmt.Println() + } + t.buf = nil + } +} diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index cc251e11b0f..f740ab1b19f 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -453,9 +453,8 @@ func (n *Node) Peers() []multiaddr.Multiaddr { Path: n.IPFSBin, Args: []string{"swarm", "peers"}, }) - lines := strings.Split(strings.TrimSpace(res.Stdout.String()), "\n") var addrs []multiaddr.Multiaddr - for _, line := range lines { + for _, line := range res.Stdout.Lines() { ma, err := multiaddr.NewMultiaddr(line) if err != nil { panic(err) @@ -465,6 +464,28 @@ func (n *Node) Peers() []multiaddr.Multiaddr { return addrs } +func (n *Node) PeerWith(other *Node) { + n.UpdateConfig(func(cfg *config.Config) { + var addrs []multiaddr.Multiaddr + for _, addrStr := range other.ReadConfig().Addresses.Swarm { + ma, err := multiaddr.NewMultiaddr(addrStr) + if err != nil { + panic(err) + } + addrs = append(addrs, ma) + } + + cfg.Peering.Peers = append(cfg.Peering.Peers, peer.AddrInfo{ + ID: other.PeerID(), + Addrs: addrs, + }) + }) +} + +func (n *Node) Disconnect(other *Node) { + n.IPFS("swarm", "disconnect", "/p2p/"+other.PeerID().String()) +} + // GatewayURL waits for the gateway file and then returns its contents or times out. func (n *Node) GatewayURL() string { timer := time.NewTimer(1 * time.Second) diff --git a/test/cli/harness/nodes.go b/test/cli/harness/nodes.go index 872d7767913..78662afbbea 100644 --- a/test/cli/harness/nodes.go +++ b/test/cli/harness/nodes.go @@ -3,6 +3,7 @@ package harness import ( "sync" + . "github.com/ipfs/kubo/test/cli/testutils" "github.com/multiformats/go-multiaddr" "golang.org/x/sync/errgroup" ) @@ -11,9 +12,7 @@ import ( type Nodes []*Node func (n Nodes) Init(args ...string) Nodes { - for _, node := range n { - node.Init() - } + ForEachPar(n, func(node *Node) { node.Init(args...) }) return n } @@ -59,22 +58,11 @@ func (n Nodes) Connect() Nodes { } func (n Nodes) StartDaemons() Nodes { - wg := sync.WaitGroup{} - for _, node := range n { - wg.Add(1) - node := node - go func() { - defer wg.Done() - node.StartDaemon() - }() - } - wg.Wait() + ForEachPar(n, func(node *Node) { node.StartDaemon() }) return n } func (n Nodes) StopDaemons() Nodes { - for _, node := range n { - node.StopDaemon() - } + ForEachPar(n, func(node *Node) { node.StopDaemon() }) return n } diff --git a/test/cli/peering_test.go b/test/cli/peering_test.go new file mode 100644 index 00000000000..f3e797fae80 --- /dev/null +++ b/test/cli/peering_test.go @@ -0,0 +1,141 @@ +package cli + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/test/cli/harness" + . "github.com/ipfs/kubo/test/cli/testutils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" +) + +func TestPeering(t *testing.T) { + t.Parallel() + + type peering struct { + from int + to int + } + + newRandPort := func() int { + n := rand.Int() + return 3000 + (n % 1000) + } + + containsPeerID := func(p peer.ID, peers []peer.ID) bool { + for _, peerID := range peers { + if p == peerID { + return true + } + } + return false + } + + assertPeered := func(h *harness.Harness, from *harness.Node, to *harness.Node) { + assert.Eventuallyf(t, func() bool { + fromPeers := from.Peers() + if len(fromPeers) == 0 { + return false + } + var fromPeerIDs []peer.ID + for _, p := range fromPeers { + fromPeerIDs = append(fromPeerIDs, h.ExtractPeerID(p)) + } + return containsPeerID(to.PeerID(), fromPeerIDs) + }, 20*time.Second, 10*time.Millisecond, "%d -> %d not peered", from.ID, to.ID) + } + + assertNotPeered := func(h *harness.Harness, from *harness.Node, to *harness.Node) { + assert.Eventuallyf(t, func() bool { + fromPeers := from.Peers() + if len(fromPeers) == 0 { + return false + } + var fromPeerIDs []peer.ID + for _, p := range fromPeers { + fromPeerIDs = append(fromPeerIDs, h.ExtractPeerID(p)) + } + return !containsPeerID(to.PeerID(), fromPeerIDs) + }, 20*time.Second, 10*time.Millisecond, "%d -> %d peered", from.ID, to.ID) + } + + assertPeerings := func(h *harness.Harness, nodes []*harness.Node, peerings []peering) { + ForEachPar(peerings, func(peering peering) { + assertPeered(h, nodes[peering.from], nodes[peering.to]) + }) + } + + createNodes := func(t *testing.T, n int, peerings []peering) (*harness.Harness, harness.Nodes) { + h := harness.NewT(t) + nodes := h.NewNodes(n).Init() + nodes.ForEachPar(func(node *harness.Node) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Routing.Type = config.NewOptionalString("none") + cfg.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", newRandPort())} + }) + + }) + + for _, peering := range peerings { + nodes[peering.from].PeerWith(nodes[peering.to]) + } + + return h, nodes + } + + t.Run("bidirectional peering should work (simultaneous connect)", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[0].Disconnect(nodes[1]) + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 should reconnect to 2 when 2 disconnects from 1", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[2].Disconnect(nodes[1]) + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 will peer with 2 when it comes online", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes[0].StartDaemon() + nodes[1].StartDaemon() + assertPeerings(h, nodes, []peering{{from: 0, to: 1}, {from: 1, to: 0}}) + + nodes[2].StartDaemon() + assertPeerings(h, nodes, peerings) + }) + + t.Run("1 will re-peer with 2 when it disconnects and then comes back online", func(t *testing.T) { + t.Parallel() + peerings := []peering{{from: 0, to: 1}, {from: 1, to: 0}, {from: 1, to: 2}} + h, nodes := createNodes(t, 3, peerings) + + nodes.StartDaemons() + assertPeerings(h, nodes, peerings) + + nodes[2].StopDaemon() + assertNotPeered(h, nodes[1], nodes[2]) + + nodes[2].StartDaemon() + assertPeerings(h, nodes, peerings) + }) +} diff --git a/test/cli/testutils/strings.go b/test/cli/testutils/strings.go index 1fb1512485e..110051e679f 100644 --- a/test/cli/testutils/strings.go +++ b/test/cli/testutils/strings.go @@ -7,6 +7,7 @@ import ( "net/netip" "net/url" "strings" + "sync" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -75,3 +76,16 @@ func URLStrToMultiaddr(u string) multiaddr.Multiaddr { } return ma } + +// ForEachPar invokes f in a new goroutine for each element of s and waits for all to complete. +func ForEachPar[T any](s []T, f func(T)) { + wg := sync.WaitGroup{} + wg.Add(len(s)) + for _, x := range s { + go func(x T) { + defer wg.Done() + f(x) + }(x) + } + wg.Wait() +} diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh deleted file mode 100755 index 207b279803a..00000000000 --- a/test/sharness/t0171-peering.sh +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env bash - -test_description="Test peering service" - -. lib/test-lib.sh - -NUM_NODES=3 - -test_expect_success 'init iptb' ' - rm -rf .iptb/ && - iptb testbed create -type localipfs -count $NUM_NODES -init -' - -test_expect_success 'disabling routing' ' - iptb run -- ipfs config Routing.Type none -' - -for i in $(seq 0 2); do - ADDR="$(printf '["/ip4/127.0.0.1/tcp/%s"]' "$(( 3000 + ( RANDOM % 1000 ) ))")" - test_expect_success "configuring node $i to listen on $ADDR" ' - ipfsi "$i" config --json Addresses.Swarm "$ADDR" - ' -done - -peer_id() { - ipfsi "$1" config Identity.PeerID -} - -peer_addrs() { - ipfsi "$1" config Addresses.Swarm -} - -peer() { - PEER1="$1" && - PEER2="$2" && - PEER_LIST="$(ipfsi "$PEER1" config Peering.Peers || true)" && - { [[ "$PEER_LIST" == "null" ]] || PEER_LIST_INNER="${PEER_LIST:1:-1}"; } && - ADDR_INFO="$(printf '[%s{"ID": "%s", "Addrs": %s}]' \ - "${PEER_LIST_INNER:+${PEER_LIST_INNER},}" \ - "$(peer_id "$PEER2")" \ - "$(peer_addrs "$PEER2")")" && - ipfsi "$PEER1" config --json Peering.Peers "${ADDR_INFO}" -} - -# Peer: -# - 0 <-> 1 -# - 1 -> 2 -test_expect_success 'configure peering' ' - peer 0 1 && - peer 1 0 && - peer 1 2 -' - -list_peers() { - ipfsi "$1" swarm peers | sed 's|.*/p2p/\([^/]*\)$|\1|' | sort -u -} - -check_peers() { - sleep 20 # give it some time to settle. - test_expect_success 'verifying peering for peer 0' ' - list_peers 0 > peers_0_actual && - peer_id 1 > peers_0_expected && - test_cmp peers_0_expected peers_0_actual - ' - - test_expect_success 'verifying peering for peer 1' ' - list_peers 1 > peers_1_actual && - { peer_id 0 && peer_id 2 ; } | sort -u > peers_1_expected && - test_cmp peers_1_expected peers_1_actual - ' - - test_expect_success 'verifying peering for peer 2' ' - list_peers 2 > peers_2_actual && - peer_id 1 > peers_2_expected && - test_cmp peers_2_expected peers_2_actual - ' -} - -test_expect_success 'startup cluster' ' - iptb start -wait && - iptb run -- ipfs log level peering debug -' - -check_peers - -disconnect() { - ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" -} - -# Bidirectional peering shouldn't cause problems (e.g., simultaneous connect -# issues). -test_expect_success 'disconnecting 0->1' ' - disconnect 0 1 -' - -check_peers - -# 1 should reconnect to 2 when 2 disconnects from 1. -test_expect_success 'disconnecting 2->1' ' - disconnect 2 1 -' - -check_peers - -# 2 isn't peering. This test ensures that 1 will re-peer with 2 when it comes -# back online. -test_expect_success 'stopping 2' ' - iptb stop 2 -' - -# Wait to disconnect -sleep 30 - -test_expect_success 'starting 2' ' - iptb start 2 -' - -# Wait for backoff -sleep 30 - -check_peers - -test_expect_success "stop testbed" ' - iptb stop -' - -test_done