Skip to content

Commit

Permalink
Merge branch 'master' of github.com:projectcalico/calico-bgp-daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
hitomitak committed May 8, 2017
2 parents 6abeac3 + ae3f200 commit 26f3830
Showing 1 changed file with 104 additions and 8 deletions.
112 changes: 104 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,46 @@ func getEtcdConfig(cfg *calicoapi.CalicoAPIConfig) (etcd.Config, error) {
return config, nil
}

// recursiveNexthopLookup returns bgpNexthop's actual nexthop
// In GCE environment, the interface address is /32 and the BGP nexthop is
// off-subnet. This function looks up kernel RIB and returns a nexthop to
// reach the BGP nexthop.
// When the BGP nexthop can be reached with a connected route,
// this function returns the BGP nexthop.
func recursiveNexthopLookup(bgpNexthop net.IP) (net.IP, error) {
routes, err := netlink.RouteGet(bgpNexthop)
if err != nil {
return nil, err
}
if len(routes) == 0 {
return nil, fmt.Errorf("no route for path: %s", bgpNexthop)
}
r := routes[0]
if r.Gw != nil {
return r.Gw, nil
}
// bgpNexthop can be reached by a connected route
return bgpNexthop, nil
}

func cleanUpRoutes() error {
filter := &netlink.Route{
Protocol: RTPROT_GOBGP,
}
list4, err := netlink.RouteListFiltered(netlink.FAMILY_V4, filter, netlink.RT_FILTER_PROTOCOL)
if err != nil {
return err
}
list6, err := netlink.RouteListFiltered(netlink.FAMILY_V6, filter, netlink.RT_FILTER_PROTOCOL)
if err != nil {
return err
}
for _, route := range append(list4, list6...) {
netlink.RouteDel(&route)
}
return nil
}

type Server struct {
t tomb.Tomb
bgpServer *bgpserver.BgpServer
Expand All @@ -108,6 +148,7 @@ type Server struct {
ipv4 net.IP
ipv6 net.IP
ipam *ipamCache
reloadCh chan []*bgptable.Path
}

func NewServer() (*Server, error) {
Expand Down Expand Up @@ -156,6 +197,7 @@ func NewServer() (*Server, error) {
etcd: etcdCli,
ipv4: ipv4,
ipv6: ipv6,
reloadCh: make(chan []*bgptable.Path),
}, nil
}

Expand Down Expand Up @@ -194,6 +236,10 @@ func (s *Server) Serve() {
s.t.Go(func() error { return fmt.Errorf("watchKernelRoute: %s", s.watchKernelRoute()) })

<-s.t.Dying()

if err := cleanUpRoutes(); err != nil {
log.Fatalf("%s, also failed to clean up routes which we injected: %s", s.t.Err(), err)
}
log.Fatal(s.t.Err())

}
Expand Down Expand Up @@ -221,7 +267,8 @@ func (s *Server) ipamUpdateHandler(pool *ipPool) error {
if route.Dst == nil {
continue
}
if pool.contain(route.Dst.String()) {
prefix := route.Dst.String()
if pool.contain(prefix) {
ipip := pool.IPIP != ""
if pool.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) {
ipip = false
Expand All @@ -234,7 +281,29 @@ func (s *Server) ipamUpdateHandler(pool *ipPool) error {
route.LinkIndex = i.Index
route.SetFlag(netlink.FLAG_ONLINK)
} else {
route.LinkIndex = 0
tbl, err := s.bgpServer.GetRib("", bgp.RF_IPv4_UC, []*bgptable.LookupPrefix{
&bgptable.LookupPrefix{
Prefix: prefix,
},
})
if err != nil {
return err
}
bests := tbl.Bests("")
if len(bests) == 0 {
log.Printf("no best for %s", prefix)
continue
}
best := bests[0]
if best.IsLocal() {
log.Printf("%s's best is local path", prefix)
continue
}
gw, err := recursiveNexthopLookup(best.GetNexthop())
if err != nil {
return err
}
route.Gw = gw
route.Flags = 0
}
return netlink.RouteReplace(&route)
Expand Down Expand Up @@ -745,6 +814,20 @@ func (s *Server) watchKernelRoute() error {
if _, err = s.bgpServer.AddPath("", []*bgptable.Path{path}); err != nil {
return err
}
} else if update.Table == syscall.RT_TABLE_LOCAL {
// This means the interface address is updated
// Some routes we injected may be deleted by the kernel
// Reload routes from BGP RIB and inject again
ip, _, _ := net.ParseCIDR(update.Dst.String())
family := bgp.RF_IPv4_UC
if ip.To4() == nil {
family = bgp.RF_IPv6_UC
}
tbl, err := s.bgpServer.GetRib("", family, nil)
if err != nil {
return err
}
s.reloadCh <- tbl.Bests("")
}
}
return fmt.Errorf("netlink route subscription ended")
Expand All @@ -762,9 +845,10 @@ func (s *Server) injectRoute(path *bgptable.Path) error {
Protocol: RTPROT_GOBGP,
}

ipip := false
if dst.IP.To4() != nil {
if p := s.ipam.match(nlri.String()); p != nil {
ipip := p.IPIP != ""
ipip = p.IPIP != ""

node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)})
if err != nil {
Expand All @@ -789,6 +873,13 @@ func (s *Server) injectRoute(path *bgptable.Path) error {
log.Printf("removed route %s from kernel", nlri)
return netlink.RouteDel(route)
}
if !ipip {
gw, err := recursiveNexthopLookup(path.GetNexthop())
if err != nil {
return err
}
route.Gw = gw
}
log.Printf("added route %s to kernel %s", nlri, route)
return netlink.RouteReplace(route)
}
Expand All @@ -799,12 +890,17 @@ func (s *Server) injectRoute(path *bgptable.Path) error {
func (s *Server) watchBGPPath() error {
watcher := s.bgpServer.Watch(bgpserver.WatchBestPath())
for {
ev := <-watcher.Event()
msg, ok := ev.(*bgpserver.WatchEventBestPath)
if !ok {
continue
var paths []*bgptable.Path
select {
case ev := <-watcher.Event():
msg, ok := ev.(*bgpserver.WatchEventBestPath)
if !ok {
continue
}
paths = msg.PathList
case paths = <-s.reloadCh:
}
for _, path := range msg.PathList {
for _, path := range paths {
if path.IsLocal() {
continue
}
Expand Down

0 comments on commit 26f3830

Please sign in to comment.