Skip to content

Commit

Permalink
Merge branch 'magic-dns-support' of https://github.com/juanfont/heads…
Browse files Browse the repository at this point in the history
…cale into magic-dns-support
  • Loading branch information
juanfont committed Oct 9, 2021
2 parents 6884798 + c4487b7 commit 9a0c976
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 167 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Build

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: "1.16.3"

- name: Install dependencies
run: |
go version
go install golang.org/x/lint/golint@latest
sudo apt update
sudo apt install -y make
- name: Run lint
run: make build

- uses: actions/upload-artifact@v2
with:
name: headscale-linux
path: headscale
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (h *Headscale) getMapResponse(mKey wgkey.Key, req tailcfg.MapRequest, m *Ma
log.Trace().
Str("func", "getMapResponse").
Str("machine", req.Hostinfo.Hostname).
Interface("payload", resp).
// Interface("payload", resp).
Msgf("Generated map response: %s", tailMapResponseToString(resp))

var respBody []byte
Expand Down
35 changes: 23 additions & 12 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -67,9 +68,6 @@ type Headscale struct {
aclPolicy *ACLPolicy
aclRules *[]tailcfg.FilterRule

clientsUpdateChannels sync.Map
clientsUpdateChannelMutex sync.Mutex

lastStateChange sync.Map
}

Expand Down Expand Up @@ -158,10 +156,9 @@ func (h *Headscale) expireEphemeralNodesWorker() {
if err != nil {
log.Error().Err(err).Str("machine", m.Name).Msg("🤮 Cannot delete ephemeral machine from the database")
}
updateRequestsFromNode.WithLabelValues("ephemeral-node-update").Inc()
h.notifyChangesToPeers(&m)
}
}
h.setLastStateChangeToNow(ns.Name)
}
}

Expand Down Expand Up @@ -264,14 +261,28 @@ func (h *Headscale) setLastStateChangeToNow(namespace string) {
h.lastStateChange.Store(namespace, now)
}

func (h *Headscale) getLastStateChange(namespace string) time.Time {
if wrapped, ok := h.lastStateChange.Load(namespace); ok {
lastChange, _ := wrapped.(time.Time)
return lastChange
func (h *Headscale) getLastStateChange(namespaces ...string) time.Time {
times := []time.Time{}

for _, namespace := range namespaces {
if wrapped, ok := h.lastStateChange.Load(namespace); ok {
lastChange, _ := wrapped.(time.Time)

times = append(times, lastChange)
}

}

now := time.Now().UTC()
h.lastStateChange.Store(namespace, now)
return now
sort.Slice(times, func(i, j int) bool {
return times[i].After(times[j])
})

log.Trace().Msgf("Latest times %#v", times)

if len(times) == 0 {
return time.Now().UTC()

} else {
return times[0]
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/docker/cli v20.10.8+incompatible // indirect
github.com/docker/docker v20.10.8+incompatible // indirect
github.com/efekarakus/termcolor v1.0.1
github.com/fatih/set v0.2.1 // indirect
github.com/gin-gonic/gin v1.7.4
github.com/gofrs/uuid v4.0.0+incompatible
github.com/google/go-github v17.0.0+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ github.com/fanliao/go-promise v0.0.0-20141029170127-1890db352a72/go.mod h1:Pjfxu
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA=
github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
Expand Down
129 changes: 21 additions & 108 deletions machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package headscale

import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/fatih/set"
"github.com/rs/zerolog/log"

"gorm.io/datatypes"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (h *Headscale) getDirectPeers(m *Machine) (Machines, error) {
if err := h.db.Preload("Namespace").Where("namespace_id = ? AND machine_key <> ? AND registered",
m.NamespaceID, m.MachineKey).Find(&machines).Error; err != nil {
log.Error().Err(err).Msg("Error accessing db")
return nil, err
return Machines{}, err
}

sort.Slice(machines, func(i, j int) bool { return machines[i].ID < machines[j].ID })
Expand All @@ -88,7 +88,7 @@ func (h *Headscale) getShared(m *Machine) (Machines, error) {
sharedMachines := []SharedMachine{}
if err := h.db.Preload("Namespace").Preload("Machine").Where("namespace_id = ?",
m.NamespaceID).Find(&sharedMachines).Error; err != nil {
return nil, err
return Machines{}, err
}

peers := make(Machines, 0)
Expand All @@ -112,7 +112,7 @@ func (h *Headscale) getPeers(m *Machine) (Machines, error) {
Str("func", "getPeers").
Err(err).
Msg("Cannot fetch peers")
return nil, err
return Machines{}, err
}

shared, err := h.getShared(m)
Expand All @@ -121,7 +121,7 @@ func (h *Headscale) getPeers(m *Machine) (Machines, error) {
Str("func", "getDirectPeers").
Err(err).
Msg("Cannot fetch peers")
return nil, err
return Machines{}, err
}

peers := append(direct, shared...)
Expand Down Expand Up @@ -214,118 +214,31 @@ func (m *Machine) GetHostInfo() (*tailcfg.Hostinfo, error) {
return &hostinfo, nil
}

func (h *Headscale) notifyChangesToPeers(m *Machine) {
peers, err := h.getPeers(m)
func (h *Headscale) isOutdated(m *Machine) bool {
err := h.UpdateMachine(m)
if err != nil {
log.Error().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Msgf("Error getting peers: %s", err)
return
}
for _, peer := range peers {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notifying peer %s (%s)", peer.Name, peer.IPAddress)
err := h.sendRequestOnUpdateChannel(&peer)
if err != nil {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Msgf("Peer %s does not have an open update client, skipping.", peer.Name)
continue
}
log.Trace().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notified peer %s (%s)", peer.Name, peer.IPAddress)
}
}

func (h *Headscale) getOrOpenUpdateChannel(m *Machine) <-chan struct{} {
var updateChan chan struct{}
if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
updateChan = unwrapped
} else {
log.Error().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Failed to convert update channel to struct{}")
}
} else {
log.Debug().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Update channel not found, creating")

updateChan = make(chan struct{})
h.clientsUpdateChannels.Store(m.ID, updateChan)
// It does not seem meaningful to propagate this error as the end result
// will have to be that the machine has to be considered outdated.
return true
}
return updateChan
}

func (h *Headscale) closeUpdateChannel(m *Machine) {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()
sharedMachines, _ := h.getShared(m)

if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
close(unwrapped)
}
}
h.clientsUpdateChannels.Delete(m.ID)
}
namespaceSet := set.New(set.ThreadSafe)
namespaceSet.Add(m.Namespace.Name)

func (h *Headscale) sendRequestOnUpdateChannel(m *Machine) error {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()

pUp, ok := h.clientsUpdateChannels.Load(uint64(m.ID))
if ok {
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notifying peer %s", m.Name)

if update, ok := pUp.(chan struct{}); ok {
log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Update channel is %#v", update)

updateRequestsToNode.Inc()
update <- struct{}{}

log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notified machine %s", m.Name)
}
} else {
err := errors.New("machine does not have an open update channel")
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Machine %s does not have an open update channel", m.Name)
return err
// Check if any of our shared namespaces has updates that we have
// not propagated.
for _, sharedMachine := range sharedMachines {
namespaceSet.Add(sharedMachine.Namespace.Name)
}
return nil
}

func (h *Headscale) isOutdated(m *Machine) bool {
err := h.UpdateMachine(m)
if err != nil {
return true
namespaces := make([]string, namespaceSet.Size())
for index, namespace := range namespaceSet.List() {
namespaces[index] = namespace.(string)
}

lastChange := h.getLastStateChange(m.Namespace.Name)
lastChange := h.getLastStateChange(namespaces...)
log.Trace().
Str("func", "keepAlive").
Str("machine", m.Name).
Expand Down
10 changes: 5 additions & 5 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ var (
Namespace: prometheusNamespace,
Name: "update_request_from_node_total",
Help: "The number of updates requested by a node/update function",
}, []string{"state"})
updateRequestsToNode = promauto.NewCounter(prometheus.CounterOpts{
}, []string{"namespace", "machine", "state"})
updateRequestsSentToNode = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "update_request_to_node_total",
Name: "update_request_sent_to_node_total",
Help: "The number of calls/messages issued on a specific nodes update channel",
})
}, []string{"namespace", "machine", "status"})
//TODO(kradalby): This is very debugging, we might want to remove it.
updateRequestsReceivedOnChannel = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "update_request_received_on_channel_total",
Help: "The number of update requests received on an update channel",
}, []string{"machine"})
}, []string{"namespace", "machine"})
)
19 changes: 6 additions & 13 deletions namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,17 @@ func (h *Headscale) checkForNamespacesPendingUpdates() {
return
}

names := []string{}
err = json.Unmarshal([]byte(v), &names)
namespaces := []string{}
err = json.Unmarshal([]byte(v), &namespaces)
if err != nil {
return
}
for _, name := range names {
for _, namespace := range namespaces {
log.Trace().
Str("func", "RequestMapUpdates").
Str("machine", name).
Msg("Sending updates to nodes in namespace")
machines, err := h.ListMachinesInNamespace(name)
if err != nil {
continue
}
for _, m := range *machines {
updateRequestsFromNode.WithLabelValues("namespace-update").Inc()
h.notifyChangesToPeers(&m)
}
Str("machine", namespace).
Msg("Sending updates to nodes in namespacespace")
h.setLastStateChangeToNow(namespace)
}
newV, err := h.getValue("namespaces_pending_updates")
if err != nil {
Expand Down
Loading

0 comments on commit 9a0c976

Please sign in to comment.