diff --git a/go.mod b/go.mod index ffc301c..3cfa593 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/RobinUS2/golang-moving-average v1.0.0 github.com/jedisct1/dlog v0.0.0-20190909160351-692385b00b84 - github.com/jedisct1/go-clocksmith v0.0.0-20190707124905-73e087c7979c golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) diff --git a/repique/behaviors/fswatcher_windows.go b/repique/behaviors/fswatcher_windows.go index 748bc82..926b60a 100644 --- a/repique/behaviors/fswatcher_windows.go +++ b/repique/behaviors/fswatcher_windows.go @@ -153,38 +153,35 @@ func fswatcher_init () { panic(err) } go fswatcher_loop() - for { - select { - case reg := <- register: - if path0, err := filepath.Abs(reg.filename); err != nil { + for reg := range register { + if path0, err := filepath.Abs(reg.filename); err != nil { + panic("fswatcher_init failed:" + err.Error()) + } else { + dir := fixLongPath(path.Dir(path0)) + var fs0 fs + if info, err := os.Stat(path0); err != nil { panic("fswatcher_init failed:" + err.Error()) } else { - dir := fixLongPath(path.Dir(path0)) - var fs0 fs - if info, err := os.Stat(path0); err != nil { + fs0 = fs{filepath:path0, lastinfo:&atomic.Value{},callback:reg.callback} + fs0.lastinfo.Store(info) + } + var found bool + for _, w := range fswatcher { + if w.folder == dir { + found = true + w.files = append(w.files, fs0) + break + } + } + if !found { + path_ptr, _ := syscall.UTF16PtrFromString(dir) + if handle, err := findFirstChangeNotification(path_ptr, false, syscall.FILE_NOTIFY_CHANGE_LAST_WRITE); err != nil { panic("fswatcher_init failed:" + err.Error()) } else { - fs0 = fs{filepath:path0, lastinfo:&atomic.Value{},callback:reg.callback} - fs0.lastinfo.Store(info) - } - var found bool - for _, w := range fswatcher { - if w.folder == dir { - found = true - w.files = append(w.files, fs0) - break - } - } - if !found { - path_ptr, _ := syscall.UTF16PtrFromString(dir) - if handle, err := findFirstChangeNotification(path_ptr, false, syscall.FILE_NOTIFY_CHANGE_LAST_WRITE); err != nil { - panic("fswatcher_init failed:" + err.Error()) - } else { - w := watcher{folder:dir, files:[]fs{fs0}, win_handle:handle} - fswatcher = append(fswatcher, w) - if !setEvent(reset) { - panic("fswatcher_init failed to call SetEvent") - } + w := watcher{folder:dir, files:[]fs{fs0}, win_handle:handle} + fswatcher = append(fswatcher, w) + if !setEvent(reset) { + panic("fswatcher_init failed to call SetEvent") } } } diff --git a/repique/features/dns/nodes/dnscrypt.go b/repique/features/dns/nodes/dnscrypt.go index 41147f6..8c57098 100644 --- a/repique/features/dns/nodes/dnscrypt.go +++ b/repique/features/dns/nodes/dnscrypt.go @@ -23,12 +23,12 @@ type dnscryptnode struct { bs_relays []*common.Endpoint } -func (n *dnscryptnode) boost(o *node) *uint32 { +func (n *dnscryptnode) boost(o *node) interface{} { if o.status&status_outdated == status_outdated || n.GetDefaultExpiration().Before(time.Now()) { relays := n.bs_relays if _, err := dnscrypt.RetrieveServicesInfo(false, n.Resolver, n.dailFn, n.Network, n.ipaddr, &relays); err == nil { - n.bs2epring() - expired := uint32(n.GetDefaultExpiration().Unix()) + n.bs2epring(relays) + expired := n.GetDefaultExpiration() return &expired } else { dlog.Debugf("dnscrypt boost failed, %v", err) @@ -37,8 +37,8 @@ func (n *dnscryptnode) boost(o *node) *uint32 { return nil } -func (n *dnscryptnode) bs2epring() { - if epring := common.LinkEPRing(n.bs_relays...); epring != nil { +func (n *dnscryptnode) bs2epring(eps []*common.Endpoint) { + if epring := common.LinkEPRing(eps...); epring != nil { epring.Do(func(v interface{}){ dlog.Infof("relay [%s*%s]=%s", *n.Name, v.(*common.EPRing).Order(), v.(*common.EPRing).String()) }) @@ -75,7 +75,7 @@ func (n *dnscryptnode) unmarshal(ss *struct{c uint8; v string}) *time.Time { } else { n.V2_Services = append(n.V2_Services, s) } - n.bs2epring() + n.bs2epring(n.bs_relays) to := time.Unix(int64(s.DtTo), 0) return &to } diff --git a/repique/features/dns/nodes/materials.go b/repique/features/dns/nodes/materials.go index cc07880..4f50e54 100644 --- a/repique/features/dns/nodes/materials.go +++ b/repique/features/dns/nodes/materials.go @@ -89,12 +89,13 @@ func (m *materials) open(path string, identity []byte) { } } -func (m *materials) unmarshalto(items []marshalable) (updated []marshalable) { +func (m *materials) unmarshalto(items []marshalable) (updated []marshalable, dts []*time.Time) { for _, item := range items { if s, found := m.values[*item.name()]; found { if material := item.material(); material != nil { if dt := material.unmarshal(s); dt == nil || dt.After(time.Now()) { updated = append(updated, item) + dts = append(dts, dt) } } } diff --git a/repique/features/dns/nodes/node_manager.go b/repique/features/dns/nodes/node_manager.go index a6696c1..6d89764 100644 --- a/repique/features/dns/nodes/node_manager.go +++ b/repique/features/dns/nodes/node_manager.go @@ -23,11 +23,9 @@ import ( "github.com/AZ-X/pique/repique/features/dns/channels" "github.com/AZ-X/pique/repique/features/dns/nodes/metrics" "github.com/AZ-X/pique/repique/services" - stamps "github.com/AZ-X/pique/repique/unclassified/stammel" - smith "github.com/jedisct1/go-clocksmith" - "github.com/jedisct1/dlog" + "github.com/jedisct1/dlog" "github.com/AZ-X/dns" ) @@ -52,7 +50,7 @@ const ( ) type connectivity interface { - boost(*node) *uint32 + boost(*node) interface{} // *uint32 or *time.Time } type _DNSInterface interface { @@ -101,10 +99,9 @@ func (n *node) awaitresolution() bool { } func (n *node) awaitboost() bool { - return n.status&( + return n.status&status_broken == 0 && n.status&( status_outdated| - status_broken | - status_bootstrapping) == status_outdated|status_bootstrapping + status_bootstrapping) != 0 } func (n *node) dnssec() bool { @@ -168,10 +165,22 @@ func (n *node) evaluate() { } } +type boostrunnable struct { + n *node + mgr *NodesMgr +} + +func (r *boostrunnable) run() { + dlog.Debugf("reboost %s", *r.n.name()) + r.n.status|=status_outdated + r.mgr.fetchmaterials(r.n.name()) +} + // A simple node manager across all servers type NodesMgr struct { *conceptions.SemaGroup *materials + *smith metrics.RTT nodes map[string]*node q2nodesFunc *[]func(*string, uint8)[]_DNSService @@ -352,7 +361,7 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte, } newDnscryptNode := func(svr *common.RegisteredServer) (node *dnscryptnode, c node_capable) { hasDnscrypt = true - c = status_bootstrapping|status_outdated + c = status_outdated if cfg.DefaultUnavailable { c|=status_unusable } @@ -412,6 +421,7 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte, if hasDnscrypt { dlog.Noticef("dnscrypt-protocol bind to %s", network2) } + mgr.smith = &smith{} if len(cfg.ExportCredentialPath) > 0 { mgr.materials = &materials{} mgr.open(cfg.ExportCredentialPath, sum) @@ -423,9 +433,24 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte, nodes = append(nodes, n) } } - for _, n := range mgr.unmarshalto(nodes) { + updates, dts := mgr.unmarshalto(nodes) + for i, n := range updates { dlog.Debugf("exported material to %s", *n.name()) - n.(*node).status&^=status_outdated|status_bootstrapping + node := n.(*node) + node.status&^=status_outdated|status_bootstrapping + f := func(){ + node.status|=status_outdated + } + if cfg.FetchInterval > 0 { + r := &boostrunnable{node, mgr,} + f = r.run + } + if dt := dts[i]; dt != nil { + dlog.Debugf("next expiration for %s on %v", *n.name(), dt) + mgr.addevent(dt, 0, f) + } else if cfg.FetchInterval > 0 { + mgr.addevent(nil, uint32(cfg.FetchInterval)*60 - 5, f) + } } } } @@ -434,16 +459,19 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte, go func(interval time.Duration, least2 bool) { <-mgr.Ready close(mgr.Ready) - for { - delay := interval + var f func() + f = func () { mgr.fetchmaterials() + delay := interval lives, total := mgr.available() if least2 && lives <= 1 && total != lives { - delay = 100 * time.Millisecond * time.Duration(total - lives) + delay = time.Duration(total - lives) * time.Second } debug.FreeOSMemory() - smith.Sleep(delay) + mgr.addevent(nil, uint32(delay.Seconds()), f) } + f() + mgr.pilot() }(time.Duration(common.Max(60, cfg.FetchInterval)) * time.Minute, cfg.FetchAtLeastTwo) } } @@ -461,7 +489,7 @@ func (mgr *NodesMgr) available() (c int, t int) { const _DNSRoot = "." // booster for DoH & DoT -func (mgr *NodesMgr) boost(n *node) *uint32 { +func (mgr *NodesMgr) boost(n *node) interface{} { var node *tlsnode var bs_ips *tls_bs_ips switch n.proto() { @@ -522,34 +550,41 @@ func (mgr *NodesMgr) boost(n *node) *uint32 { return ttl } -func (mgr *NodesMgr) fetchmaterials() { - if !mgr.BeginExclusive() { - dlog.Warn("semi-refresh occurs") - return +func (mgr *NodesMgr) fetchmaterials(opts ...*string) { + if len(opts) == 0 { + if !mgr.BeginExclusive() { + dlog.Warn("semi-refresh occurs") + return + } + mgr.proveResolution() + mgr.associate() + mgr.EndExclusive() } - mgr.proveResolution() - mgr.associate() - mgr.EndExclusive() - nodes, rts := make([]*node, 0), make([]chan *uint32, 0) - for _, n := range mgr.nodes { - if n.awaitboost() { + nodes, rts := make([]*node, 0), make([]chan interface{}, 0) + for key, n := range mgr.nodes { + if (len(opts) == 0 || key == *opts[0]) && n.awaitboost() { nodes = append(nodes, n) - rt := make(chan *uint32) + rt := make(chan interface{}) rts = append(rts, rt) - go func(n1 *node, r chan<- *uint32) { + go func(n1 *node, r chan<- interface{}) { dlog.Debugf("ready to boost %s", *n1.name()) r <- n1.boost(n1) close(r) }(n, rt) } } - updates, rtdt := make([]*node, 0), make([]*uint32, 0) + updates := make([]*node, 0) for c := len(rts) -1; c >= 0; c-- { rt := <- rts[c] if rt != nil { updates = append(updates, nodes[c]) - rtdt = append(rtdt, rt) + r := &boostrunnable{nodes[c], mgr,} + if dt, ok := rt.(*time.Time); ok { + mgr.addevent(dt, 0, r.run) + } else { + mgr.addevent(nil, *rt.(*uint32) + uint32(c), r.run) + } } else { dlog.Debugf("can not boost %s", *nodes[c].name()) } @@ -572,7 +607,9 @@ func (mgr *NodesMgr) fetchmaterials() { mgr.savepoint() } } - mgr.proveResolution() + if len(opts) == 0 { + mgr.proveResolution() + } mgr.associate() } @@ -698,10 +735,7 @@ func (mgr *NodesMgr) associate() { func (mgr *NodesMgr) pick(s *channels.Session) _DNSService { if s.ServerName != nil && *s.ServerName != channels.NonSvrName { - if node := mgr.nodes[*s.ServerName]; node.applicable() { - return node._DNSService - } - return nil + return mgr.nodes[*s.ServerName]._DNSService } var candidates []_DNSService diff --git a/repique/features/dns/nodes/tasks.go b/repique/features/dns/nodes/tasks.go new file mode 100644 index 0000000..ffbf5bf --- /dev/null +++ b/repique/features/dns/nodes/tasks.go @@ -0,0 +1,69 @@ +package nodes + +import ( + "sort" + "time" +) + +type smith struct { + scheduler *time.Timer + events []*struct{base *time.Time; off uint32; task func()} +} + +func (t *smith) addevent(base *time.Time, off uint32, task func()) { + if base == nil { + now := time.Now() + base = &now + } + event := &struct{base *time.Time; off uint32; task func()}{base:base, off:off, task:task,} + t.events = append(t.events, event) + if t.scheduler != nil && len(t.events) > 1 { + if t.duration(len(t.events) - 1) < t.duration(len(t.events) - 2) { + if !t.scheduler.Reset(1 * time.Second) { + t.scheduler.Stop() + } + t.scheduler = nil + return + } + } + sort.Slice(t.events, func(i, j int) bool { + return t.duration(i) > t.duration(j) + }) +} + +func (t *smith) duration(idx int) time.Duration { + event := t.events[idx] + return time.Duration(event.off) * time.Second + event.base.Sub(time.Now()) +} + +func (t *smith) pilot() { + var f func() + f = func() { + const cascade = 5 * time.Minute // serial within partition goroutine + if len(t.events) > 0 { + tail := len(t.events) - 1 + d := t.duration(tail) + if t.scheduler != nil { + task := t.events[tail].task + t.events = t.events[:tail] + task() + for { + tail = len(t.events) - 1 + if tail < 0 { + return + } + if d = t.duration(tail); d <= cascade { + time.Sleep(d) + task = t.events[tail].task + t.events = t.events[:tail] + task() + } else { + break + } + } + } + t.scheduler = time.AfterFunc(d, f) + } + } + f() +} \ No newline at end of file diff --git a/vendor/github.com/jedisct1/go-clocksmith/.gitignore b/vendor/github.com/jedisct1/go-clocksmith/.gitignore deleted file mode 100644 index a1338d6..0000000 --- a/vendor/github.com/jedisct1/go-clocksmith/.gitignore +++ /dev/null @@ -1,14 +0,0 @@ -# Binaries for programs and plugins -*.exe -*.dll -*.so -*.dylib - -# Test binary, build with `go test -c` -*.test - -# Output of the go coverage tool, specifically when used with LiteIDE -*.out - -# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 -.glide/ diff --git a/vendor/github.com/jedisct1/go-clocksmith/LICENSE b/vendor/github.com/jedisct1/go-clocksmith/LICENSE deleted file mode 100644 index 245c4d7..0000000 --- a/vendor/github.com/jedisct1/go-clocksmith/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018-2019 Frank Denis - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/jedisct1/go-clocksmith/README.md b/vendor/github.com/jedisct1/go-clocksmith/README.md deleted file mode 100644 index 194829a..0000000 --- a/vendor/github.com/jedisct1/go-clocksmith/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# clocksmith - -A sleep-aware sleep() function, that doesn't pause (for too long) if -the system goes to hibernation. diff --git a/vendor/github.com/jedisct1/go-clocksmith/clocksmith.go b/vendor/github.com/jedisct1/go-clocksmith/clocksmith.go deleted file mode 100644 index 952b1b0..0000000 --- a/vendor/github.com/jedisct1/go-clocksmith/clocksmith.go +++ /dev/null @@ -1,34 +0,0 @@ -package clocksmith - -import "time" - -const ( - // DefaultGranularity - Maximum duration of actual time.Sleep() calls - DefaultGranularity = 5 * time.Second -) - -// SleepWithGranularity - sleeps for the given amount of time, with the given granularity; -// doesn't pause if the system goes to hibernation -func SleepWithGranularity(duration time.Duration, granularity time.Duration) { - if duration <= granularity { - time.Sleep(duration) - return - } - start := time.Now().Unix() - for { - time.Sleep(granularity) - elapsed := time.Duration(time.Now().Unix()-start) * time.Second - if elapsed < 0 || elapsed > duration { - break - } else if elapsed > duration-granularity { - time.Sleep(duration - elapsed) - break - } - } -} - -// Sleep - sleeps for the given amount of time, with the default granularity; -// doesn't pause if the system goes to hibernation -func Sleep(duration time.Duration) { - SleepWithGranularity(duration, DefaultGranularity) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3e5cec2..d53f861 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -11,9 +11,6 @@ github.com/RobinUS2/golang-moving-average ## explicit github.com/jedisct1/dlog # github.com/jedisct1/dlog => ./mod/dboy -# github.com/jedisct1/go-clocksmith v0.0.0-20190707124905-73e087c7979c -## explicit -github.com/jedisct1/go-clocksmith ## explicit github.com/jedisct1/xsecretbox # golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83