From f1b7a194f83cec05480f1758fff1b2de95a0e0ca Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 19 Sep 2021 10:44:38 +0100 Subject: [PATCH] stop using goprocess for shutdown --- go.mod | 1 - go.sum | 4 -- mapping.go | 15 ++++---- nat.go | 107 +++++++++++++++++++++++++++++------------------------ 4 files changed, 65 insertions(+), 62 deletions(-) diff --git a/go.mod b/go.mod index 2173e62..2db8d88 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,5 @@ go 1.16 require ( github.com/ipfs/go-log/v2 v2.0.3 - github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-nat v0.0.6-0.20210919090249-d0d64ea585d6 ) diff --git a/go.sum b/go.sum index 450faf2..7faaf25 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,6 @@ github.com/ipfs/go-log/v2 v2.0.3 h1:Q2gXcBoCALyLN/pUQlz1qgu0x3uFV6FzP9oXhpfyJpc= github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= -github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= -github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ= diff --git a/mapping.go b/mapping.go index 1835ed9..4ab636d 100644 --- a/mapping.go +++ b/mapping.go @@ -5,8 +5,6 @@ import ( "net" "sync" "time" - - "github.com/jbenet/goprocess" ) // Mapping represents a port mapping in a NAT. @@ -38,11 +36,11 @@ type Mapping interface { type mapping struct { sync.Mutex // guards all fields - nat *NAT - proto string - intport int - extport int - proc goprocess.Process + nat *NAT + proto string + intport int + extport int + teardown func(*mapping) cached net.IP cacheTime time.Time @@ -117,5 +115,6 @@ func (m *mapping) ExternalAddr() (net.Addr, error) { } func (m *mapping) Close() error { - return m.proc.Close() + m.teardown(m) + return nil } diff --git a/nat.go b/nat.go index a0e00b7..cad873c 100644 --- a/nat.go +++ b/nat.go @@ -8,16 +8,12 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-nat" - "github.com/jbenet/goprocess" - periodic "github.com/jbenet/goprocess/periodic" + "github.com/libp2p/go-nat" ) -var ( - // ErrNoMapping signals no mapping exists for an address - ErrNoMapping = errors.New("mapping not established") -) +// ErrNoMapping signals no mapping exists for an address +var ErrNoMapping = errors.New("mapping not established") var log = logging.Logger("nat") @@ -54,29 +50,35 @@ func DiscoverNAT(ctx context.Context) (*NAT, error) { type NAT struct { natmu sync.Mutex nat nat.NAT - proc goprocess.Process + + refCount sync.WaitGroup + ctx context.Context + ctxCancel context.CancelFunc mappingmu sync.RWMutex // guards mappings + closed bool mappings map[*mapping]struct{} } func newNAT(realNAT nat.NAT) *NAT { + ctx, cancel := context.WithCancel(context.Background()) return &NAT{ - nat: realNAT, - proc: goprocess.WithParent(goprocess.Background()), - mappings: make(map[*mapping]struct{}), + nat: realNAT, + mappings: make(map[*mapping]struct{}), + ctx: ctx, + ctxCancel: cancel, } } // Close shuts down all port mappings. NAT can no longer be used. func (nat *NAT) Close() error { - return nat.proc.Close() -} + nat.mappingmu.Lock() + nat.closed = true + nat.mappingmu.Unlock() -// Process returns the nat's life-cycle manager, for making it listen -// to close signals. -func (nat *NAT) Process() goprocess.Process { - return nat.proc + nat.ctxCancel() + nat.refCount.Wait() + return nil } // Mappings returns a slice of all NAT mappings @@ -90,21 +92,6 @@ func (nat *NAT) Mappings() []Mapping { return maps2 } -func (nat *NAT) addMapping(m *mapping) { - // make mapping automatically close when nat is closed. - nat.proc.AddChild(m.proc) - - nat.mappingmu.Lock() - nat.mappings[m] = struct{}{} - nat.mappingmu.Unlock() -} - -func (nat *NAT) rmMapping(m *mapping) { - nat.mappingmu.Lock() - delete(nat.mappings, m) - nat.mappingmu.Unlock() -} - // NewMapping attempts to construct a mapping on protocol and internal port // It will also periodically renew the mapping until the returned Mapping // -- or its parent NAT -- is Closed. @@ -125,24 +112,21 @@ func (nat *NAT) NewMapping(protocol string, port int) (Mapping, error) { } m := &mapping{ - intport: port, - nat: nat, - proto: protocol, + intport: port, + nat: nat, + proto: protocol, + teardown: nat.removeMapping, } - m.proc = goprocess.WithTeardown(func() error { - nat.rmMapping(m) - nat.natmu.Lock() - defer nat.natmu.Unlock() - nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort()) - return nil - }) - - nat.addMapping(m) - - m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) { - nat.establishMapping(m) - })) + nat.mappingmu.Lock() + if nat.closed { + nat.mappingmu.Unlock() + return nil, errors.New("closed") + } + nat.mappings[m] = struct{}{} + nat.refCount.Add(1) + nat.mappingmu.Unlock() + go nat.refreshMappings(m) // do it once synchronously, so first mapping is done right away, and before exiting, // allowing users -- in the optimistic case -- to use results right after. @@ -150,11 +134,36 @@ func (nat *NAT) NewMapping(protocol string, port int) (Mapping, error) { return m, nil } +func (nat *NAT) removeMapping(m *mapping) { + nat.mappingmu.Lock() + delete(nat.mappings, m) + nat.mappingmu.Unlock() + nat.natmu.Lock() + nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort()) + nat.natmu.Unlock() +} + +func (nat *NAT) refreshMappings(m *mapping) { + defer nat.refCount.Done() + t := time.NewTicker(MappingDuration / 3) + defer t.Stop() + + for { + select { + case <-t.C: + nat.establishMapping(m) + case <-nat.ctx.Done(): + m.Close() + return + } + } +} + func (nat *NAT) establishMapping(m *mapping) { oldport := m.ExternalPort() log.Debugf("Attempting port map: %s/%d", m.Protocol(), m.InternalPort()) - comment := "libp2p" + const comment = "libp2p" nat.natmu.Lock() newport, err := nat.nat.AddPortMapping(m.Protocol(), m.InternalPort(), comment, MappingDuration)