Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

stop using goprocess for shutdown #38

Merged
merged 3 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ go 1.16

require (
github.com/ipfs/go-log/v2 v2.0.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-nat v0.0.5
github.com/libp2p/go-nat v0.1.0
)
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@ github.com/ipfs/go-log/v2 v2.0.3 h1:Q2gXcBoCALyLN/pUQlz1qgu0x3uFV6FzP9oXhpfyJpc=
github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ=
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk=
github.com/libp2p/go-nat v0.0.5 h1:qxnwkco8RLKqVh1NmjQ+tJ8p8khNLFxuElYG/TwqW4Q=
github.com/libp2p/go-nat v0.0.5/go.mod h1:B7NxsVNPZmRLvMOwiEO1scOSyjA56zxYAGv1yQgRkEU=
github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg=
github.com/libp2p/go-nat v0.1.0/go.mod h1:X7teVkwRHNInVNWQiO/tAiAVRwSr5zoRz4YSTC3uRBM=
github.com/libp2p/go-netroute v0.1.2 h1:UHhB35chwgvcRI392znJA3RCBtZ3MpE3ahNCN5MR4Xg=
github.com/libp2p/go-netroute v0.1.2/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk=
github.com/libp2p/go-sockaddr v0.0.2 h1:tCuXfpA9rq7llM/v834RKc/Xvovy/AqM9kHvTV/jY/Q=
Expand Down
6 changes: 2 additions & 4 deletions mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"net"
"sync"
"time"

"github.com/jbenet/goprocess"
)

// Mapping represents a port mapping in a NAT.
Expand Down Expand Up @@ -42,7 +40,6 @@ type mapping struct {
proto string
intport int
extport int
proc goprocess.Process

cached net.IP
cacheTime time.Time
Expand Down Expand Up @@ -117,5 +114,6 @@ func (m *mapping) ExternalAddr() (net.Addr, error) {
}

func (m *mapping) Close() error {
return m.proc.Close()
m.nat.removeMapping(m)
return nil
}
120 changes: 56 additions & 64 deletions nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
goprocess "github.com/jbenet/goprocess"
periodic "github.com/jbenet/goprocess/periodic"
nat "github.com/libp2p/go-nat"
)

var (
// ErrNoMapping signals no mapping exists for an address
ErrNoMapping = errors.New("mapping not established")
"github.com/libp2p/go-nat"
)

// ErrNoMapping signals no mapping exists for an address
var ErrNoMapping = errors.New("mapping not established")

var log = logging.Logger("nat")

// MappingDuration is a default port mapping duration.
Expand All @@ -30,24 +27,7 @@ const CacheTime = time.Second * 15
// DiscoverNAT looks for a NAT device in the network and
// returns an object that can manage port mappings.
func DiscoverNAT(ctx context.Context) (*NAT, error) {
var (
natInstance nat.NAT
err error
)

done := make(chan struct{})
go func() {
defer close(done)
// This will abort in 10 seconds anyways.
natInstance, err = nat.DiscoverGateway()
}()

select {
case <-done:
case <-ctx.Done():
return nil, ctx.Err()
}

natInstance, err := nat.DiscoverGateway(ctx)
if err != nil {
return nil, err
}
Expand All @@ -70,29 +50,35 @@ func DiscoverNAT(ctx context.Context) (*NAT, error) {
type NAT struct {
natmu sync.Mutex
nat nat.NAT
proc goprocess.Process

refCount sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc

mappingmu sync.RWMutex // guards mappings
closed bool
mappings map[*mapping]struct{}
}

func newNAT(realNAT nat.NAT) *NAT {
ctx, cancel := context.WithCancel(context.Background())
return &NAT{
nat: realNAT,
proc: goprocess.WithParent(goprocess.Background()),
mappings: make(map[*mapping]struct{}),
nat: realNAT,
mappings: make(map[*mapping]struct{}),
ctx: ctx,
ctxCancel: cancel,
}
}

// Close shuts down all port mappings. NAT can no longer be used.
func (nat *NAT) Close() error {
return nat.proc.Close()
}
nat.mappingmu.Lock()
nat.closed = true
nat.mappingmu.Unlock()

// Process returns the nat's life-cycle manager, for making it listen
// to close signals.
func (nat *NAT) Process() goprocess.Process {
return nat.proc
nat.ctxCancel()
nat.refCount.Wait()
return nil
}

// Mappings returns a slice of all NAT mappings
Expand All @@ -106,21 +92,6 @@ func (nat *NAT) Mappings() []Mapping {
return maps2
}

func (nat *NAT) addMapping(m *mapping) {
// make mapping automatically close when nat is closed.
nat.proc.AddChild(m.proc)

nat.mappingmu.Lock()
nat.mappings[m] = struct{}{}
nat.mappingmu.Unlock()
}

func (nat *NAT) rmMapping(m *mapping) {
nat.mappingmu.Lock()
delete(nat.mappings, m)
nat.mappingmu.Unlock()
}

// NewMapping attempts to construct a mapping on protocol and internal port
// It will also periodically renew the mapping until the returned Mapping
// -- or its parent NAT -- is Closed.
Expand All @@ -146,31 +117,52 @@ func (nat *NAT) NewMapping(protocol string, port int) (Mapping, error) {
proto: protocol,
}

m.proc = goprocess.WithTeardown(func() error {
nat.rmMapping(m)
nat.natmu.Lock()
defer nat.natmu.Unlock()
nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort())
return nil
})

nat.addMapping(m)

m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) {
nat.establishMapping(m)
}))
nat.mappingmu.Lock()
if nat.closed {
nat.mappingmu.Unlock()
return nil, errors.New("closed")
}
nat.mappings[m] = struct{}{}
nat.refCount.Add(1)
nat.mappingmu.Unlock()
go nat.refreshMappings(m)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not too difficult, it would be nice to have a single worker loop to handle refreshing mappings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it's not that easy, because of the way that mapping and nat are coupled. Not sure I want to make such a change in a repo where we don't have any tests...


// do it once synchronously, so first mapping is done right away, and before exiting,
// allowing users -- in the optimistic case -- to use results right after.
nat.establishMapping(m)
return m, nil
}

func (nat *NAT) removeMapping(m *mapping) {
nat.mappingmu.Lock()
delete(nat.mappings, m)
nat.mappingmu.Unlock()
nat.natmu.Lock()
nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort())
nat.natmu.Unlock()
}

func (nat *NAT) refreshMappings(m *mapping) {
defer nat.refCount.Done()
t := time.NewTicker(MappingDuration / 3)
defer t.Stop()

for {
select {
case <-t.C:
nat.establishMapping(m)
case <-nat.ctx.Done():
m.Close()
return
}
}
}

func (nat *NAT) establishMapping(m *mapping) {
oldport := m.ExternalPort()

log.Debugf("Attempting port map: %s/%d", m.Protocol(), m.InternalPort())
comment := "libp2p"
const comment = "libp2p"

nat.natmu.Lock()
newport, err := nat.nat.AddPortMapping(m.Protocol(), m.InternalPort(), comment, MappingDuration)
Expand Down