Skip to content

Commit

Permalink
pkg/snet: change snet.Topology to a struct (#4673)
Browse files Browse the repository at this point in the history
Instead of having the snet.Topology as an interface, change it to a
struct. This makes the contract clearer, LocalIA and PortRange are 
only considered when creating a connection. The Interface information
however can dynamically change.

The daemon API now exposes an adapter to load the topology information
once or to load the information periodically.
  • Loading branch information
lukedirtwalker authored Dec 24, 2024
1 parent efbbd58 commit 05dfe81
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 95 deletions.
39 changes: 16 additions & 23 deletions control/cmd/control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func realMain(ctx context.Context) error {
SCIONNetworkMetrics: metrics.SCIONNetworkMetrics,
SCIONPacketConnMetrics: metrics.SCIONPacketConnMetrics,
MTU: topo.MTU(),
Topology: cpInfoProvider{topo: topo},
Topology: adaptTopology(topo),
}
quicStack, err := nc.QUICStack()
if err != nil {
Expand Down Expand Up @@ -945,29 +945,22 @@ func (h *healther) GetCAHealth(ctx context.Context) (api.CAHealthStatus, bool) {
return api.Unavailable, false
}

type cpInfoProvider struct {
topo *topology.Loader
}

func (c cpInfoProvider) LocalIA(_ context.Context) (addr.IA, error) {
return c.topo.IA(), nil
}

func (c cpInfoProvider) PortRange(_ context.Context) (uint16, uint16, error) {
start, end := c.topo.PortRange()
return start, end, nil
}

func (c cpInfoProvider) Interfaces(_ context.Context) (map[uint16]netip.AddrPort, error) {
ifMap := c.topo.InterfaceInfoMap()
ifsToUDP := make(map[uint16]netip.AddrPort, len(ifMap))
for i, v := range ifMap {
if i > (1<<16)-1 {
return nil, serrors.New("invalid interface id", "id", i)
}
ifsToUDP[uint16(i)] = v.InternalAddr
func adaptTopology(topo *topology.Loader) snet.Topology {
start, end := topo.PortRange()
return snet.Topology{
LocalIA: topo.IA(),
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a := topo.UnderlayNextHop(ifID)
if a == nil {
return netip.AddrPort{}, false
}
return a.AddrPort(), true
},
}
return ifsToUDP, nil
}

func getCAHealth(
Expand Down
16 changes: 11 additions & 5 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,16 @@ func (g *Gateway) Run(ctx context.Context) error {
// *********************************************
// Initialize base SCION network information: IA
// *********************************************
localIA, err := g.Daemon.LocalIA(context.Background())
topoReloader, err := daemon.NewReloadingTopology(ctx, g.Daemon)
if err != nil {
return serrors.Wrap("unable to learn local ISD-AS number", err)
return serrors.Wrap("loading topology", err)
}
topo := topoReloader.Topology()
go func() {
defer log.HandlePanic()
topoReloader.Run(ctx, 10*time.Second)
}()
localIA := topo.LocalIA
logger.Info("Learned local IA from SCION Daemon", "ia", localIA)

// *************************************************************************
Expand Down Expand Up @@ -299,7 +305,7 @@ func (g *Gateway) Run(ctx context.Context) error {
ProbesSendErrors: probesSendErrors,
SCMPErrors: g.Metrics.SCMPErrors,
SCIONPacketConnMetrics: g.Metrics.SCIONPacketConnMetrics,
Topology: g.Daemon,
Topology: topo,
},
PathUpdateInterval: PathUpdateInterval(ctx),
PathFetchTimeout: 0, // using default for now
Expand Down Expand Up @@ -409,7 +415,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetworkNoSCMP is the network for the QUIC server connection. Because SCMP errors
// will cause the server's accepts to fail, we ignore SCMP.
scionNetworkNoSCMP := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
// Discard all SCMP propagation, to avoid accept/read errors on the
// QUIC server/client.
SCMPHandler: snet.SCMPPropagationStopper{
Expand Down Expand Up @@ -472,7 +478,7 @@ func (g *Gateway) Run(ctx context.Context) error {
// scionNetwork is the network for all SCION connections, with the exception of the QUIC server
// and client connection.
scionNetwork := &snet.SCIONNetwork{
Topology: g.Daemon,
Topology: topo,
SCMPHandler: snet.DefaultSCMPHandler{
RevocationHandler: revocationHandler,
SCMPErrors: g.Metrics.SCMPErrors,
Expand Down
17 changes: 16 additions & 1 deletion pkg/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("//tools/lint:go.bzl", "go_library")
load("//tools/lint:go.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -7,6 +7,7 @@ go_library(
"daemon.go",
"grpc.go",
"metrics.go",
"topology.go",
],
importpath = "github.com/scionproto/scion/pkg/daemon",
visibility = ["//visibility:public"],
Expand All @@ -15,6 +16,7 @@ go_library(
"//pkg/daemon/internal/metrics:go_default_library",
"//pkg/drkey:go_default_library",
"//pkg/grpc:go_default_library",
"//pkg/log:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/private/ctrl/path_mgmt:go_default_library",
"//pkg/private/prom:go_default_library",
Expand All @@ -32,3 +34,16 @@ go_library(
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["topology_test.go"],
deps = [
":go_default_library",
"//pkg/addr:go_default_library",
"//pkg/daemon/mock_daemon:go_default_library",
"//pkg/snet:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
135 changes: 135 additions & 0 deletions pkg/daemon/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package daemon

import (
"context"
"net/netip"
"sync/atomic"
"time"

"github.com/scionproto/scion/pkg/log"
"github.com/scionproto/scion/pkg/private/serrors"
"github.com/scionproto/scion/pkg/snet"
)

// LoadTopology loads the local topology from the given connector. The topology
// information is loaded once and does not update automatically.
func LoadTopology(ctx context.Context, conn Connector) (snet.Topology, error) {
ia, err := conn.LocalIA(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading local ISD-AS", err)
}
start, end, err := conn.PortRange(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading port range", err)
}
interfaces, err := conn.Interfaces(ctx)
if err != nil {
return snet.Topology{}, serrors.Wrap("loading interfaces", err)
}

return snet.Topology{
LocalIA: ia,
PortRange: snet.TopologyPortRange{
Start: start,
End: end,
},
Interface: func(ifID uint16) (netip.AddrPort, bool) {
a, ok := interfaces[ifID]
return a, ok
},
}, nil
}

// ReloadingTopology is a topology that reloads the interface information
// periodically. It is safe for concurrent use.
type ReloadingTopology struct {
conn Connector
baseTopology snet.Topology
interfaces atomic.Pointer[map[uint16]netip.AddrPort]
}

// NewReloadingTopology creates a new ReloadingTopology that reloads the
// interface information periodically. The Run method must be called for
// interface information to be populated.
func NewReloadingTopology(ctx context.Context, conn Connector) (*ReloadingTopology, error) {
ia, err := conn.LocalIA(ctx)
if err != nil {
return nil, serrors.Wrap("loading local ISD-AS", err)
}
start, end, err := conn.PortRange(ctx)
if err != nil {
return nil, serrors.Wrap("loading port range", err)
}
t := &ReloadingTopology{
conn: conn,
baseTopology: snet.Topology{
LocalIA: ia,
PortRange: snet.TopologyPortRange{Start: start, End: end},
},
}
if err := t.loadInterfaces(ctx); err != nil {
return nil, err
}
return t, nil
}

func (t *ReloadingTopology) Topology() snet.Topology {
base := t.baseTopology
return snet.Topology{
LocalIA: base.LocalIA,
PortRange: base.PortRange,
Interface: func(ifID uint16) (netip.AddrPort, bool) {
m := t.interfaces.Load()
if m == nil {
return netip.AddrPort{}, false
}
a, ok := (*m)[ifID]
return a, ok
},
}
}

func (t *ReloadingTopology) Run(ctx context.Context, period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

reload := func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := t.loadInterfaces(ctx); err != nil {
log.FromCtx(ctx).Error("Failed to reload interfaces", "err", err)
}
}
reload()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reload()
}
}
}

func (t *ReloadingTopology) loadInterfaces(ctx context.Context) error {
intfs, err := t.conn.Interfaces(ctx)
if err != nil {
return err
}
t.interfaces.Store(&intfs)
return nil
}
Loading

0 comments on commit 05dfe81

Please sign in to comment.