From 9d655d7ad708f7966a8d87dff44c081d589e6dc6 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 16 Sep 2022 15:33:10 -0400 Subject: [PATCH 1/7] Better support for cloud hosting. Make public IP configurable for peering, instead of depending on discovery via interface list. --- config/mock.go | 9 +++++++++ internal/peer/redis.go | 1 + sharder/deterministic.go | 28 ++++++++++++++++++++++++---- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/config/mock.go b/config/mock.go index 3cc5911b2c..01bd9ec6fd 100644 --- a/config/mock.go +++ b/config/mock.go @@ -22,6 +22,8 @@ type MockConfig struct { GetListenAddrVal string GetPeerListenAddrErr error GetPeerListenAddrVal string + GetPeerPublicAddrErr error + GetPeerPublicAddrVal string GetCompressPeerCommunicationsVal bool GetGRPCListenAddrErr error GetGRPCListenAddrVal string @@ -144,6 +146,13 @@ func (m *MockConfig) GetPeerListenAddr() (string, error) { return m.GetPeerListenAddrVal, m.GetPeerListenAddrErr } +func (m *MockConfig) GetPeerPublicAddr() (string, error) { + m.Mux.RLock() + defer m.Mux.RUnlock() + + return m.GetPeerPublicAddrVal, m.GetPeerPublicAddrErr +} + func (m *MockConfig) GetCompressPeerCommunication() bool { m.Mux.RLock() defer m.Mux.RUnlock() diff --git a/internal/peer/redis.go b/internal/peer/redis.go index 041433ac72..4f60a27e94 100644 --- a/internal/peer/redis.go +++ b/internal/peer/redis.go @@ -83,6 +83,7 @@ func newRedisPeers(ctx context.Context, c config.Config) (Peers, error) { } // deal with this error + // address, err := c.GetPeerPublicAddr() address, err := publicAddr(c) if err != nil { diff --git a/sharder/deterministic.go b/sharder/deterministic.go index af8138f1f7..27430ec399 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -15,6 +15,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) // shardingSalt is a random bit to make sure we don't shard the same as any @@ -122,28 +123,47 @@ func (d *DeterministicSharder) Start() error { if err != nil { return errors.Wrap(err, "failed to get local interface list to initialize sharder") } + localAddrCidrs := make([]string, len(localAddrs)) + for i, addr := range localAddrs { + localAddrCidrs[i] = addr.String() + } + + // If RedisIdentifier is an IP, add it to localAddrs. + redisIdentifier, err := d.Config.GetRedisIdentifier() + if err == nil { + d.Logger.Debug().Logf("Using RedisIdentifier as self: %s", redisIdentifier) + if ip := net.ParseIP(redisIdentifier); ip != nil { + d.Logger.Debug().Logf("RedisIdentifier is an IP") + localAddrCidrs = append(localAddrCidrs, fmt.Sprintf("%s/32", redisIdentifier)) + } + } // go through peer list, resolve each address, see if any of them match any // local interface. Note that this assumes only one instance of Refinery per // host can run. for i, peerShard := range d.peers { - d.Logger.Debug().WithField("peer", peerShard).WithField("self", localAddrs).Logf("Considering peer looking for self") + d.Logger.Debug().WithField("peer", peerShard).WithField("self", localAddrCidrs).Logf("Considering peer looking for self") peerIPList, err := net.LookupHost(peerShard.ipOrHost) if err != nil { // TODO something better than fail to start if peer is missing return errors.Wrap(err, fmt.Sprintf("couldn't resolve peer hostname %s", peerShard.ipOrHost)) } for _, peerIP := range peerIPList { - for _, localIP := range localAddrs { - ipAddr, _, err := net.ParseCIDR(localIP.String()) + for _, localAddrCidr := range localAddrCidrs { + ipAddr, _, err := net.ParseCIDR(localAddrCidr) if err != nil { - return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", localIP.String())) + return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", localAddrCidr)) } if peerIP == ipAddr.String() { if peerShard.port == localPort { d.Logger.Debug().WithField("peer", peerShard).Logf("Found myself in peer list") found = true selfIndexIntoPeerList = i + } else { + d.Logger.Debug().WithFields(logrus.Fields{ + "peer": peerShard, + "expectedPort": localPort, + }).Logf("Peer port mismatch") } } } From d0d2a4241910110c64f4cec9e54e0007ca2c0797 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Mon, 19 Sep 2022 17:06:14 -0400 Subject: [PATCH 2/7] Tidy code. --- sharder/deterministic.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sharder/deterministic.go b/sharder/deterministic.go index 27430ec399..bd02ec4438 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -142,7 +142,10 @@ func (d *DeterministicSharder) Start() error { // local interface. Note that this assumes only one instance of Refinery per // host can run. for i, peerShard := range d.peers { - d.Logger.Debug().WithField("peer", peerShard).WithField("self", localAddrCidrs).Logf("Considering peer looking for self") + d.Logger.Debug().WithFields(logrus.Fields{ + "peer": peerShard, + "self": localAddrCidrs, + ).Logf("Considering peer looking for self") peerIPList, err := net.LookupHost(peerShard.ipOrHost) if err != nil { // TODO something better than fail to start if peer is missing From 194fc8ffee2ac5ea08c58e33eb3f19603a0d9b4a Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Mon, 19 Sep 2022 17:10:12 -0400 Subject: [PATCH 3/7] Remove unused code. --- config/mock.go | 9 --------- internal/peer/redis.go | 1 - 2 files changed, 10 deletions(-) diff --git a/config/mock.go b/config/mock.go index 01bd9ec6fd..3cc5911b2c 100644 --- a/config/mock.go +++ b/config/mock.go @@ -22,8 +22,6 @@ type MockConfig struct { GetListenAddrVal string GetPeerListenAddrErr error GetPeerListenAddrVal string - GetPeerPublicAddrErr error - GetPeerPublicAddrVal string GetCompressPeerCommunicationsVal bool GetGRPCListenAddrErr error GetGRPCListenAddrVal string @@ -146,13 +144,6 @@ func (m *MockConfig) GetPeerListenAddr() (string, error) { return m.GetPeerListenAddrVal, m.GetPeerListenAddrErr } -func (m *MockConfig) GetPeerPublicAddr() (string, error) { - m.Mux.RLock() - defer m.Mux.RUnlock() - - return m.GetPeerPublicAddrVal, m.GetPeerPublicAddrErr -} - func (m *MockConfig) GetCompressPeerCommunication() bool { m.Mux.RLock() defer m.Mux.RUnlock() diff --git a/internal/peer/redis.go b/internal/peer/redis.go index 4f60a27e94..041433ac72 100644 --- a/internal/peer/redis.go +++ b/internal/peer/redis.go @@ -83,7 +83,6 @@ func newRedisPeers(ctx context.Context, c config.Config) (Peers, error) { } // deal with this error - // address, err := c.GetPeerPublicAddr() address, err := publicAddr(c) if err != nil { From 70b26a16e2c9563d124a6573d6ee03a5623d9ccd Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Mon, 19 Sep 2022 17:39:00 -0400 Subject: [PATCH 4/7] Fix syntax error. --- sharder/deterministic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sharder/deterministic.go b/sharder/deterministic.go index bd02ec4438..66e39c99cd 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -145,7 +145,7 @@ func (d *DeterministicSharder) Start() error { d.Logger.Debug().WithFields(logrus.Fields{ "peer": peerShard, "self": localAddrCidrs, - ).Logf("Considering peer looking for self") + }).Logf("Considering peer looking for self") peerIPList, err := net.LookupHost(peerShard.ipOrHost) if err != nil { // TODO something better than fail to start if peer is missing From 82c381ad60e618e90c47a03f82a7c1cc239718fb Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Tue, 20 Sep 2022 09:04:34 -0400 Subject: [PATCH 5/7] Clarify `RedisIdentifier` parsing logic. --- sharder/deterministic.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sharder/deterministic.go b/sharder/deterministic.go index 66e39c99cd..6427fa0c5f 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -128,12 +128,11 @@ func (d *DeterministicSharder) Start() error { localAddrCidrs[i] = addr.String() } - // If RedisIdentifier is an IP, add it to localAddrs. + // If RedisIdentifier is an IP, add it to localAddrCidrs. redisIdentifier, err := d.Config.GetRedisIdentifier() - if err == nil { - d.Logger.Debug().Logf("Using RedisIdentifier as self: %s", redisIdentifier) + if err == nil && redisIdentifier != "" { if ip := net.ParseIP(redisIdentifier); ip != nil { - d.Logger.Debug().Logf("RedisIdentifier is an IP") + d.Logger.Debug().Logf("Using RedisIdentifier as public IP: %s", redisIdentifier) localAddrCidrs = append(localAddrCidrs, fmt.Sprintf("%s/32", redisIdentifier)) } } From 53e422b21d29d23d9176ca698950479d9cbb2239 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Tue, 20 Sep 2022 09:14:06 -0400 Subject: [PATCH 6/7] Simplify local interface discovery. --- sharder/deterministic.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/sharder/deterministic.go b/sharder/deterministic.go index 6427fa0c5f..dbc97cbf47 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -118,22 +118,26 @@ func (d *DeterministicSharder) Start() error { } d.Logger.Debug().Logf("picked up local peer port of %s", localPort) - // get my local interfaces + // get my local interfaces' IPs. localAddrs, err := net.InterfaceAddrs() if err != nil { return errors.Wrap(err, "failed to get local interface list to initialize sharder") } - localAddrCidrs := make([]string, len(localAddrs)) + localIPs := make([]string, len(localAddrs)) for i, addr := range localAddrs { - localAddrCidrs[i] = addr.String() + addrStr := addr.String() + ip, _, err := net.ParseCIDR(addrStr) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", addrStr)) + } + localIPs[i] = ip.String() } - // If RedisIdentifier is an IP, add it to localAddrCidrs. - redisIdentifier, err := d.Config.GetRedisIdentifier() - if err == nil && redisIdentifier != "" { + // If RedisIdentifier is an IP, add it to localIps. + if redisIdentifier, err := d.Config.GetRedisIdentifier(); err == nil && redisIdentifier != "" { if ip := net.ParseIP(redisIdentifier); ip != nil { d.Logger.Debug().Logf("Using RedisIdentifier as public IP: %s", redisIdentifier) - localAddrCidrs = append(localAddrCidrs, fmt.Sprintf("%s/32", redisIdentifier)) + localIPs = append(localIPs, redisIdentifier) } } @@ -143,7 +147,7 @@ func (d *DeterministicSharder) Start() error { for i, peerShard := range d.peers { d.Logger.Debug().WithFields(logrus.Fields{ "peer": peerShard, - "self": localAddrCidrs, + "self": localIPs, }).Logf("Considering peer looking for self") peerIPList, err := net.LookupHost(peerShard.ipOrHost) if err != nil { @@ -151,12 +155,8 @@ func (d *DeterministicSharder) Start() error { return errors.Wrap(err, fmt.Sprintf("couldn't resolve peer hostname %s", peerShard.ipOrHost)) } for _, peerIP := range peerIPList { - for _, localAddrCidr := range localAddrCidrs { - ipAddr, _, err := net.ParseCIDR(localAddrCidr) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", localAddrCidr)) - } - if peerIP == ipAddr.String() { + for _, ipAddr := range localIPs { + if peerIP == ipAddr { if peerShard.port == localPort { d.Logger.Debug().WithField("peer", peerShard).Logf("Found myself in peer list") found = true From fbf3bc81b2ee1bd0142b55cbd6624258e2682e51 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 23 Sep 2022 10:38:38 -0400 Subject: [PATCH 7/7] Update `RedisIdentifier` logic per code reviews. --- internal/peer/redis.go | 32 ++++++++++++++++++++++---------- sharder/deterministic.go | 36 ++++++++++++++++++++---------------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/internal/peer/redis.go b/internal/peer/redis.go index 041433ac72..4d7be37715 100644 --- a/internal/peer/redis.go +++ b/internal/peer/redis.go @@ -249,6 +249,27 @@ func publicAddr(c config.Config) (string, error) { return "", err } + var myIdentifier string + + // If RedisIdentifier is set, use as identifier. + if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { + myIdentifier = redisIdentifier + logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") + } else { + // Otherwise, determine idenntifier from network interface. + myIdentifier, err = getIdentifierFromInterfaces(c) + if err != nil { + return "", err + } + } + + publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) + + return publicListenAddr, nil +} + +// Scan network interfaces to determine an identifier from either IP or hostname. +func getIdentifierFromInterfaces(c config.Config) (string, error) { myIdentifier, _ := os.Hostname() identifierInterfaceName, _ := c.GetIdentifierInterfaceName() @@ -288,16 +309,7 @@ func publicAddr(c config.Config) (string, error) { logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") } - redisIdentifier, _ := c.GetRedisIdentifier() - - if redisIdentifier != "" { - myIdentifier = redisIdentifier - logrus.WithField("identifier", myIdentifier).Info("using specific identifier from config") - } - - publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) - - return publicListenAddr, nil + return myIdentifier, nil } // equal tells whether a and b contain the same elements. diff --git a/sharder/deterministic.go b/sharder/deterministic.go index dbc97cbf47..89acc73fcb 100644 --- a/sharder/deterministic.go +++ b/sharder/deterministic.go @@ -118,26 +118,30 @@ func (d *DeterministicSharder) Start() error { } d.Logger.Debug().Logf("picked up local peer port of %s", localPort) - // get my local interfaces' IPs. - localAddrs, err := net.InterfaceAddrs() - if err != nil { - return errors.Wrap(err, "failed to get local interface list to initialize sharder") - } - localIPs := make([]string, len(localAddrs)) - for i, addr := range localAddrs { - addrStr := addr.String() - ip, _, err := net.ParseCIDR(addrStr) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", addrStr)) - } - localIPs[i] = ip.String() - } + var localIPs []string - // If RedisIdentifier is an IP, add it to localIps. + // If RedisIdentifier is an IP, use as localIPs value. if redisIdentifier, err := d.Config.GetRedisIdentifier(); err == nil && redisIdentifier != "" { if ip := net.ParseIP(redisIdentifier); ip != nil { d.Logger.Debug().Logf("Using RedisIdentifier as public IP: %s", redisIdentifier) - localIPs = append(localIPs, redisIdentifier) + localIPs = []string{redisIdentifier} + } + } + + // Otherwise, get my local interfaces' IPs. + if len(localIPs) == 0 { + localAddrs, err := net.InterfaceAddrs() + if err != nil { + return errors.Wrap(err, "failed to get local interface list to initialize sharder") + } + localIPs = make([]string, len(localAddrs)) + for i, addr := range localAddrs { + addrStr := addr.String() + ip, _, err := net.ParseCIDR(addrStr) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to parse CIDR for local IP %s", addrStr)) + } + localIPs[i] = ip.String() } }