Skip to content

Commit

Permalink
return full metric endpoint, allow all locally generated traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
OmarElawady committed Oct 11, 2021
1 parent 42436ec commit 6a12a73
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 61 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW
github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=
github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/dave/jennifer v1.3.0 h1:p3tl41zjjCZTNBytMwrUuiAnherNUZktlhPTKoF/sEk=
github.com/dave/jennifer v1.3.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
1 change: 0 additions & 1 deletion pkg/gridtypes/zos/qsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,4 @@ func (q QuantumSafeFS) Capacity() (gridtypes.Capacity, error) {
type QuatumSafeFSResult struct {
Path string `json:"path"`
MetricsEndpoint string `json:"metrics_endpoint"`
MetricsPort int `json:"metrics_port"`
}
4 changes: 2 additions & 2 deletions pkg/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ type Networker interface {

// QSFSPrepare creates a network namespace with a macvlan interface into it
// to allow qsfs container to reach the internet but not be reachable itself
// it return the name of the network namespace created.
// it return the name of the network namespace created, and the ygg ip.
// the id should be unique.
QSFSPrepare(id string) (string, error)
QSFSPrepare(id string) (string, string, error)

// QSFSDestroy rewind setup by QSFSPrepare
QSFSDestroy(id string) error
Expand Down
72 changes: 52 additions & 20 deletions pkg/network/qsfs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package network

import (
"bytes"
"context"
"fmt"
"net"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog/log"
Expand All @@ -12,8 +14,6 @@ import (
"github.com/threefoldtech/zos/pkg/network/nft"
)

var qsfsNFTTmpl *template.Template = template.Must(template.New("").Parse(_nft))

var _nft = `
flush ruleset
Expand All @@ -31,9 +31,9 @@ table inet filter {
jump base_checks
# port for prometheus
tcp dport 9100 iifname ygg0 accept
# accept only locally generated packets to zdbs
tcp dport 9900 iifname lo accept
ip6 nexthdr icmpv6 accept
# accept only locally generated packets
meta iif lo ct state new accept
ip6 nexthdr icmpv6 accept
}
chain forward {
Expand All @@ -46,46 +46,78 @@ table inet filter {
`

func applyQSFSFirewall(netns string) error {
buf := bytes.Buffer{}

if err := qsfsNFTTmpl.Execute(&buf, nil); err != nil {
return errors.Wrap(err, "failed to build nft rule set")
}

if err := nft.Apply(&buf, netns); err != nil {
if err := nft.Apply(strings.NewReader(_nft), netns); err != nil {
return errors.Wrap(err, "failed to apply nft rule set")
}

return nil
}

func (n *networker) waitYggIPs(netns string) (string, error) {
var yggNet = net.IPNet{
IP: net.ParseIP("200::"),
Mask: net.CIDRMask(7, 128),
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
isYgg := func(ip net.IP) bool {
return yggNet.Contains(ip)
}

ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
ips, _, err := n.Addrs("ygg0", netns)
if err != nil {
return "", errors.Wrap(err, "failed to get ygg0 address")
}
for _, ip := range ips {
if isYgg(ip) {
return net.IP(ip).String(), nil
}
}
case <-ctx.Done():
return "", fmt.Errorf("waiting for ygg ips timedout: context cancelled")
}
}
}

func (n networker) QSFSNamespace(id string) string {
netId := "qsfs:" + id
hw := ifaceutil.HardwareAddrFromInputBytes([]byte(netId))
return qsfsNamespacePrefix + strings.Replace(hw.String(), ":", "", -1)
}

func (n networker) QSFSPrepare(id string) (string, error) {
func (n networker) QSFSPrepare(id string) (string, string, error) {
netId := "qsfs:" + id
netNSName := n.QSFSNamespace(id)
netNs, err := createNetNS(netNSName)
if err != nil {
return "", err
return "", "", err
}
defer netNs.Close()
if err := n.ndmz.AttachNR(netId, netNSName, n.ipamLeaseDir); err != nil {
return "", errors.Wrap(err, "failed to prepare qsfs namespace")
return "", "", errors.Wrap(err, "failed to prepare qsfs namespace")
}

if err := applyQSFSFirewall(netNSName); err != nil {
return "", err
return "", "", err
}

if n.ygg == nil {
return netNSName, nil
return netNSName, "", nil
}
err = n.attachYgg(id, netNs)
if err != nil {
return "", "", err
}
ip, err := n.waitYggIPs(netNSName)
if err != nil {
return "", "", err
}

return netNSName, n.attachYgg(id, netNs)
return netNSName, ip, err
}

func (n networker) QSFSDestroy(id string) error {
Expand Down
1 change: 0 additions & 1 deletion pkg/primitives/qsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (p *Primitives) qsfsProvision(ctx context.Context, wl *gridtypes.WorkloadWi
}
result.Path = info.Path
result.MetricsEndpoint = info.MetricsEndpoint
result.MetricsPort = info.MetricsPort
return result, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/qsfsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type QSFSMetrics struct {
type QSFSInfo struct {
Path string
MetricsEndpoint string
MetricsPort int
}

func (q *QSFSMetrics) Nu(wlID string) (result uint64) {
Expand Down
37 changes: 3 additions & 34 deletions pkg/qsfsd/qsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -85,7 +84,7 @@ func (q *QSFS) Mount(wlID string, cfg zos.QuantumSafeFS) (info pkg.QSFSInfo, err

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
netns, err := networkd.QSFSPrepare(ctx, wlID)
netns, yggIP, err := networkd.QSFSPrepare(ctx, wlID)
if err != nil {
return info, errors.Wrap(err, "failed to prepare qsfs")
}
Expand Down Expand Up @@ -135,41 +134,11 @@ func (q *QSFS) Mount(wlID string, cfg zos.QuantumSafeFS) (info pkg.QSFSInfo, err
return
}
info.Path = mountPath
info.MetricsPort = 9100
info.MetricsEndpoint, err = q.waitYggIPs(ctx, networkd, netns)
info.MetricsEndpoint = fmt.Sprintf("http://[%s]:%d/metrics", yggIP, zstorMetricsPort)

return
}

func (q *QSFS) waitYggIPs(ctx context.Context, networkd *stubs.NetworkerStub, netns string) (string, error) {
var yggNet = net.IPNet{
IP: net.ParseIP("200::"),
Mask: net.CIDRMask(7, 128),
}

isYgg := func(ip net.IP) bool {
return yggNet.Contains(ip)
}

ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
ips, _, err := networkd.Addrs(ctx, "ygg0", netns)
if err != nil {
return "", errors.Wrap(err, "failed to get ygg0 address")
}
for _, ip := range ips {
if isYgg(ip) {
return net.IP(ip).String(), nil
}
}
case <-ctx.Done():
return "", fmt.Errorf("waiting for ygg ips timedout: context cancelled")
}
}
}

func (f *QSFS) waitUntilMounted(ctx context.Context, path string) error {
ticker := time.NewTicker(1 * time.Second)
for {
Expand Down Expand Up @@ -250,7 +219,7 @@ func (q *QSFS) mountPath(wlID string) string {
}

func (q *QSFS) prepareMountPath(path string) error {
if err := os.Mkdir(path, 0644); err != nil {
if err := os.MkdirAll(path, 0644); err != nil {
return err
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/stubs/network_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *NetworkerStub) QSFSNamespace(ctx context.Context, arg0 string) (ret0 st
return
}

func (s *NetworkerStub) QSFSPrepare(ctx context.Context, arg0 string) (ret0 string, ret1 error) {
func (s *NetworkerStub) QSFSPrepare(ctx context.Context, arg0 string) (ret0 string, ret1 string, ret2 error) {
args := []interface{}{arg0}
result, err := s.client.RequestContext(ctx, s.module, s.object, "QSFSPrepare", args...)
if err != nil {
Expand All @@ -307,10 +307,13 @@ func (s *NetworkerStub) QSFSPrepare(ctx context.Context, arg0 string) (ret0 stri
if err := result.Unmarshal(0, &ret0); err != nil {
panic(err)
}
ret1 = new(zbus.RemoteError)
if err := result.Unmarshal(1, &ret1); err != nil {
panic(err)
}
ret2 = new(zbus.RemoteError)
if err := result.Unmarshal(2, &ret2); err != nil {
panic(err)
}
return
}

Expand Down

0 comments on commit 6a12a73

Please sign in to comment.