-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpeers.go
137 lines (110 loc) · 3.42 KB
/
peers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"log"
"net"
"net/url"
"strconv"
"time"
"github.com/golang/groupcache"
"github.com/pkg/errors"
)
// Updater is responsible for setting the peers for a given pool, it may block and do this indefinitely or simply run once.
type Updater func(p *groupcache.HTTPPool) error
func selfInPeers(self string, peers []string) bool {
for _, peer := range peers {
if peer == self {
return true
}
}
return false
}
// StaticPeers validates and then sets the peers for a groupcaache.HTTPPool to be the provided peers
func StaticPeers(self string, peers []string) Updater {
return func(pool *groupcache.HTTPPool) error {
for i, peer := range peers {
_, err := url.Parse(peer)
if err != nil {
return errors.Wrapf(err, "failed to parse peer URL %q:%q", i, peer)
}
}
if !selfInPeers(self, peers) {
return errors.Errorf("self not in peers: %q not in %q", self, peers)
}
pool.Set(peers...)
return nil
}
}
func srvLookup(srvName string) ([]string, error) {
cname, targets, err := net.LookupSRV("bazelcache", "tcp", srvName)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve SRV record")
}
log.Printf("SRV Name: %q", cname)
// Build peer list from SRV targets
peers := make([]string, len(targets))
for i, addr := range targets {
peers[i] = (&url.URL{
Scheme: "http",
Host: net.JoinHostPort(addr.Target, strconv.Itoa(int(addr.Port))),
}).String()
log.Printf("SRV peer: %q", peers[i])
}
return peers, nil
}
// SRVDiscoveredPeers periodically sends SRV requests to the provided DNS name to discover (& set) the pool's peers
func SRVDiscoveredPeers(self string, srvPeerDNSName string, updateInterval time.Duration) Updater {
update := func(pool *groupcache.HTTPPool) error {
peers, err := srvLookup(srvPeerDNSName)
if err != nil {
return errors.Wrap(err, "srv lookup failed")
}
if !selfInPeers(self, peers) {
return errors.Errorf("self not in peers: %q not in %q", self, peers)
}
pool.Set(peers...)
return nil
}
return func(pool *groupcache.HTTPPool) error {
if err := update(pool); err != nil {
return errors.Wrap(err, "initial SRV discovery failed")
}
for range time.Tick(updateInterval) {
if err := update(pool); err != nil {
log.Println(errors.Wrap(err, "update failed"))
}
}
panic("Time.Tick stopped ticking?!?")
}
}
// DiscoveredPeers periodically sends a A record request to the provided DNS name(s) to discover (& set) the pool's peers to all returned IP addresses
func DiscoveredPeers(self string, dnsNames []string, port string, updateInterval time.Duration) Updater {
update := func(pool *groupcache.HTTPPool) error {
peers := make([]string, len(dnsNames))
for _, name := range dnsNames {
addrs, err := net.LookupHost(name)
if err != nil {
return errors.Wrapf(err, "failed to resolve %q", name)
}
for _, addr := range addrs {
p := &url.URL{Host: net.JoinHostPort(addr, port), Scheme: "http"}
peers = append(peers, p.String())
}
}
if !selfInPeers(self, peers) {
log.Printf("warning: self not in peers: %q not in %q", self, peers)
}
pool.Set(peers...)
return nil
}
return func(pool *groupcache.HTTPPool) error {
if err := update(pool); err != nil {
return errors.Wrap(err, "initial lookup failed")
}
for range time.Tick(updateInterval) {
if err := update(pool); err != nil {
log.Println(errors.Wrap(err, "update failed"))
}
}
panic("Time.Tick stopped ticking?")
}
}