Skip to content

Commit

Permalink
Order mixclient and syncer shutdown
Browse files Browse the repository at this point in the history
During clean shutdown, signal the mixing client to shutdown first, waiting for
it to finish running, before closing the RPC and SPV syncers.

Updates the mixing module to a version that supports continuing active mixes
before terminating the mixing client.
  • Loading branch information
jrick committed Feb 20, 2025
1 parent 447644e commit 11f9b24
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 54 deletions.
40 changes: 27 additions & 13 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

params := s.wallet.ChainParams()

ntfnCtx, ntfnCtxCancel := context.WithCancel(context.Background())
defer ntfnCtxCancel()
s.notifier = &notifier{
syncer: s,
ctx: ctx,
ctx: ntfnCtx,
closed: make(chan struct{}),
}
addr, err := normalizeAddress(s.opts.Address, s.opts.DefaultPort)
Expand Down Expand Up @@ -589,12 +591,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
opts = append(opts, wsrpc.WithTLSConfig(tc))
}
client, err := wsrpc.Dial(ctx, addr, opts...)
wsClient, err := wsrpc.Dial(ctx, addr, opts...)
if err != nil {
return err
}
defer client.Close()
s.rpc = dcrd.New(client)
defer wsClient.Close()
s.rpc = dcrd.New(wsClient)

// Verify that the server is running on the expected network.
var netID wire.CurrencyNet
Expand Down Expand Up @@ -723,10 +725,27 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err
}

defer func() {
ntfnCtxCancel()

select {
case <-ctx.Done():
wsClient.Close()
default:
}

// Wait for notifications to finish before returning
<-s.notifier.closed
}()

// Ensure wallet.Run cleanly finishes/is canceled first when outer
// context is canceled.
walletCtx, walletCtxCancel := context.WithCancel(context.Background())
defer walletCtxCancel()
g.Go(func() error {
// Run wallet background goroutines (currently, this just runs
// mixclient).
return s.wallet.Run(ctx)
return s.wallet.Run(walletCtx)
})

// Request notifications for mixing messages.
Expand All @@ -739,18 +758,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

log.Infof("Blockchain sync completed, wallet ready for general usage.")

// Wait for notifications to finish before returning
defer func() {
<-s.notifier.closed
}()

g.Go(func() error {
select {
case <-ctx.Done():
client.Close()
walletCtxCancel()
return ctx.Err()
case <-client.Done():
return client.Err()
case <-wsClient.Done():
return wsClient.Err()
}
})
return g.Wait()
Expand Down
9 changes: 7 additions & 2 deletions dcrwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,10 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) {
for {
err := syncer.Run(ctx)
if done(ctx) {
loggers.SyncLog.Infof("SPV synchronization stopped")
return
}
log.Errorf("SPV synchronization ended: %v", err)
loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err)
}
}

Expand Down Expand Up @@ -571,7 +572,11 @@ func rpcSyncLoop(ctx context.Context, w *wallet.Wallet) {
syncer := chain.NewSyncer(w, rpcOptions)
err := syncer.Run(ctx)
if err != nil {
loggers.SyncLog.Errorf("Wallet synchronization stopped: %v", err)
if errors.Is(err, context.Canceled) || ctx.Err() != nil {
loggers.SyncLog.Infof("RPC synchronization stopped")
return
}
loggers.SyncLog.Errorf("RPC synchronization stopped: %v", err)
select {
case <-ctx.Done():
return
Expand Down
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module decred.org/dcrwallet/v5
go 1.23

require (
decred.org/cspp/v2 v2.3.0
decred.org/cspp/v2 v2.4.0
github.com/decred/dcrd/addrmgr/v2 v2.0.4
github.com/decred/dcrd/blockchain/stake/v5 v5.0.1
github.com/decred/dcrd/blockchain/standalone/v2 v2.2.1
Expand All @@ -13,15 +13,15 @@ require (
github.com/decred/dcrd/chaincfg/v3 v3.2.1
github.com/decred/dcrd/connmgr/v3 v3.1.2
github.com/decred/dcrd/crypto/blake256 v1.1.0
github.com/decred/dcrd/crypto/rand v1.0.0
github.com/decred/dcrd/crypto/rand v1.0.1
github.com/decred/dcrd/crypto/ripemd160 v1.0.2
github.com/decred/dcrd/dcrec v1.0.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0
github.com/decred/dcrd/dcrjson/v4 v4.1.0
github.com/decred/dcrd/dcrutil/v4 v4.0.2
github.com/decred/dcrd/gcs/v4 v4.1.0
github.com/decred/dcrd/hdkeychain/v3 v3.1.2
github.com/decred/dcrd/mixing v0.4.2
github.com/decred/dcrd/mixing v0.5.0
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0
github.com/decred/dcrd/rpcclient/v8 v8.0.1
github.com/decred/dcrd/txscript/v4 v4.1.1
Expand All @@ -36,9 +36,9 @@ require (
github.com/jrick/logrotate v1.0.0
github.com/jrick/wsrpc/v2 v2.3.8
go.etcd.io/bbolt v1.3.11
golang.org/x/crypto v0.31.0
golang.org/x/sync v0.10.0
golang.org/x/term v0.27.0
golang.org/x/crypto v0.33.0
golang.org/x/sync v0.11.0
golang.org/x/term v0.29.0
google.golang.org/grpc v1.67.3
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1
google.golang.org/protobuf v1.36.1
Expand All @@ -54,8 +54,8 @@ require (
github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
32 changes: 16 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
decred.org/cspp/v2 v2.3.0 h1:GC8emJnLbOVAkgBTHK/1wy6o/m0AVsN1r4m1ZnZZWjo=
decred.org/cspp/v2 v2.3.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA=
decred.org/cspp/v2 v2.4.0 h1:whb0YW+UELHJS/UfT5MBXSJXrKUVw5omhgKNhjzYix4=
decred.org/cspp/v2 v2.4.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0=
Expand Down Expand Up @@ -30,8 +30,8 @@ github.com/decred/dcrd/container/lru v1.0.0 h1:7foQymtbu18aQWYiY9RnNIeE+kvpiN+fi
github.com/decred/dcrd/container/lru v1.0.0/go.mod h1:vlPwj0l+IzAHhQSsbgQnJgO5Cte78+yI065V+Mc5PRQ=
github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8=
github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/crypto/rand v1.0.0 h1:Ah9Asl36OZt09sGSMbJZuL1HfwGdlC38q/ZUeLDVKRg=
github.com/decred/dcrd/crypto/rand v1.0.0/go.mod h1:coa7BbxSTiKH6esi257plGfMFYuGL4MTbQlLYnOdzpE=
github.com/decred/dcrd/crypto/rand v1.0.1 h1:pYMgDRmRv1z1RNgAAs8izJstm4B+fLFiqGD5btOt2Wg=
github.com/decred/dcrd/crypto/rand v1.0.1/go.mod h1:MsA2XySk/4KpCOYW6vsNYTGuOYRK1wpvulaWCuW7RyI=
github.com/decred/dcrd/crypto/ripemd160 v1.0.2 h1:TvGTmUBHDU75OHro9ojPLK+Yv7gDl2hnUvRocRCjsys=
github.com/decred/dcrd/crypto/ripemd160 v1.0.2/go.mod h1:uGfjDyePSpa75cSQLzNdVmWlbQMBuiJkvXw/MNKRY4M=
github.com/decred/dcrd/database/v3 v3.0.2 h1:rgP7XNZemTs8ZC7bnTKO8JO79Woj5nq+yQYmB9ry7yM=
Expand All @@ -50,8 +50,8 @@ github.com/decred/dcrd/gcs/v4 v4.1.0 h1:tpW7JW53yJZlgNwl/n2NL1b8NxHaIPRUyNuLMkB/
github.com/decred/dcrd/gcs/v4 v4.1.0/go.mod h1:nPTbGM/I3Ihe5KFvUmxZEqQP/jDZQjQ63+WEi/f4lqU=
github.com/decred/dcrd/hdkeychain/v3 v3.1.2 h1:x25WuuE7zM/20EynuVMyOhL0K8BwGBBsexGq8xTiHFA=
github.com/decred/dcrd/hdkeychain/v3 v3.1.2/go.mod h1:FnNJmZ7jqUDeAo6/c/xkQi5cuxh3EWtJeMmW6/Z8lcc=
github.com/decred/dcrd/mixing v0.4.2 h1:mpt2pNIFTI6L1hXrieAWJTQJv5t9WzHcNnhI+tnAG90=
github.com/decred/dcrd/mixing v0.4.2/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ=
github.com/decred/dcrd/mixing v0.5.0 h1:KEWr6ZKuUcnAMsuWyrwpdCuL48OrCkIZbKn5B1V+wCY=
github.com/decred/dcrd/mixing v0.5.0/go.mod h1:264YZ7KgKsjQGwart40E1QiVzPvLiaKkd/T0c8jtzNI=
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 h1:l0DnCcILTNrpy8APF3FLN312ChpkQaAuW30aC/RgBaw=
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0/go.mod h1:j+kkRPXPJB5S9VFOsx8SQLcU7PTFkPKRc1aCHN4ENzA=
github.com/decred/dcrd/rpcclient/v8 v8.0.1 h1:hd81e4w1KSqvPcozJlnz6XJfWKDNuahgooH/N5E8vOU=
Expand Down Expand Up @@ -92,20 +92,20 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU=
golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=
Expand Down
49 changes: 34 additions & 15 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *Syncer) setRequiredHeight(tipHeight int32) {
}

// Run synchronizes the wallet, returning when synchronization fails or the
// context is cancelled.
// context is canceled.
func (s *Syncer) Run(ctx context.Context) (err error) {
s.doneMu.Lock()
s.done = make(chan struct{})
Expand Down Expand Up @@ -367,32 +367,43 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// Start background handlers to read received messages from remote peers
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return s.receiveGetData(ctx) })
g.Go(func() error { return s.receiveInv(ctx) })
g.Go(func() error { return s.receiveHeadersAnnouncements(ctx) })
g.Go(func() error { return s.receiveMixMsgs(ctx) })
g, gctx := errgroup.WithContext(context.Background())
g.Go(func() error { return s.receiveGetData(gctx) })
g.Go(func() error { return s.receiveInv(gctx) })
g.Go(func() error { return s.receiveHeadersAnnouncements(gctx) })
g.Go(func() error { return s.receiveMixMsgs(gctx) })
s.lp.AddHandledMessages(p2p.MaskGetData | p2p.MaskInv)

if len(s.persistentPeers) != 0 {
for i := range s.persistentPeers {
raddr := s.persistentPeers[i]
g.Go(func() error { return s.connectToPersistent(ctx, raddr) })
g.Go(func() error { return s.connectToPersistent(gctx, raddr) })
}
} else {
g.Go(func() error { return s.connectToCandidates(ctx) })
g.Go(func() error { return s.connectToCandidates(gctx) })
}

g.Go(func() error { return s.handleMempool(ctx) })
g.Go(func() error { return s.handleMempool(gctx) })

s.wallet.SetNetworkBackend(s)
defer s.wallet.SetNetworkBackend(nil)

// Ensure initial sync and wallet.Run cleanly finish/are canceled
// first when outer context is canceled.
walletCtx, walletCtxCancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
case <-gctx.Done():
}
walletCtxCancel()
}()

// Perform the initial startup sync.
g.Go(func() error {
// First step: fetch missing CFilters.
progress := make(chan wallet.MissingCFilterProgress, 1)
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s, progress)
go s.wallet.FetchMissingCFiltersWithProgress(walletCtx, s, progress)

log.Debugf("Fetching missing CFilters...")
s.fetchMissingCfiltersStart()
Expand All @@ -408,14 +419,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// Next: fetch headers and cfilters up to mainchain tip.
s.fetchHeadersStart()
log.Debugf("Fetching headers and CFilters...")
err = s.initialSyncHeaders(ctx)
err = s.initialSyncHeaders(walletCtx)
if err != nil {
return err
}
s.fetchHeadersFinished()

// Finally: Perform the initial rescan over the received blocks.
err = s.initialSyncRescan(ctx)
err = s.initialSyncRescan(walletCtx)
if err != nil {
return err
}
Expand All @@ -425,10 +436,18 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return nil
})

// Run wallet background goroutines (currently, this just runs
// mixclient).
g.Go(func() error {
return s.wallet.Run(ctx)
// Run wallet background goroutines (currently, this just runs
// mixclient).
err := s.wallet.Run(walletCtx)
if err != nil {
return err
}

// If gctx has not yet been canceled, do so here now.
// walletCtx is canceled after either ctx or gctx is canceled.
<-walletCtx.Done()
return walletCtx.Err()
})

// Wait until cancellation or a handler errors.
Expand Down

0 comments on commit 11f9b24

Please sign in to comment.