Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/dex: BookUpdate notification races #2203

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/core/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (c *Core) UpdateCert(host string, cert []byte) error {
}

// Stop reconnect retry for previous dex connection first but leave it in
// the map so it remains listed incase we need it in the interim.
// the map so it remains listed in case we need it in the interim.
if found {
dc.connMaster.Disconnect()
dc.acct.lock()
Expand Down
151 changes: 112 additions & 39 deletions client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package core

import (
"encoding/json"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -83,14 +84,14 @@ func (c *candleCache) init(in []*msgjson.Candle) {

// addCandle adds the candle using candles.Cache.Add. It returns most recent candle
// in cache.
func (c *candleCache) addCandle(msgCandle *msgjson.Candle) (recent msgjson.Candle, ok bool) {
func (c *candleCache) addCandle(msgCandle *msgjson.Candle) *msgjson.Candle {
if atomic.LoadUint32(&c.on) == 0 {
return msgjson.Candle{}, false
return nil
}
c.candleMtx.Lock()
defer c.candleMtx.Unlock()
c.Add(msgCandle)
return *c.Last(), true
return c.Last()
chappjc marked this conversation as resolved.
Show resolved Hide resolved
}

// bookie is a BookFeed manager. bookie will maintain any number of order book
Expand Down Expand Up @@ -185,36 +186,47 @@ func (b *bookie) logEpochReport(note *msgjson.EpochReportNote) error {
marketID := marketName(b.base, b.quote)
matchSummaries := b.AddRecentMatches(note.MatchSummary, note.EndStamp)
if len(note.MatchSummary) > 0 {
encPayload, err := json.Marshal(matchSummaries)
if err != nil {
return fmt.Errorf("logEpochReport: Failed to marshal payload: %+v, err: %v", matchSummaries, err)
}
b.send(&BookUpdate{
Action: EpochMatchSummary,
MarketID: marketID,
Payload: matchSummaries,
Payload: encPayload,
})
}
for durStr, cache := range b.candleCaches {
c, ok := cache.addCandle(&note.Candle)
if !ok {
c := cache.addCandle(&note.Candle)
if c == nil {
continue
}

dur, _ := time.ParseDuration(durStr)
payload := CandleUpdate{
Dur: durStr,
DurMilliSecs: uint64(dur.Milliseconds()),
Candle: c,
}
encPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("logEpochReport: Failed to marshal payload: %+v, err: %v", payload, err)
}
b.send(&BookUpdate{
Action: CandleUpdateAction,
Host: b.dc.acct.host,
MarketID: marketID,
Payload: CandleUpdate{
Dur: durStr,
DurMilliSecs: uint64(dur.Milliseconds()),
// Providing a copy of msgjson.Candle data here since it will be used concurrently.
Candle: &c,
},
Payload: encPayload,
})
}

return nil
}

// newFeed gets a new *bookFeed and cancels the close timer. feed must be called
// with the bookie.mtx locked. The feed is primed with the provided *BookUpdate.
// newFeed gets a new *bookFeed and cancels the close timer. The feed is primed
// with the provided *BookUpdate.
// newFeed must be called with dexConnection.booksMtx locked, see its description
// for details on why.
func (b *bookie) newFeed(u *BookUpdate) *bookFeed {
b.timerMtx.Lock()
if b.closeTimer != nil {
Expand Down Expand Up @@ -264,26 +276,37 @@ func (b *bookie) candles(durStr string, feedID uint32) error {
if err != nil {
return
}

b.feedsMtx.RLock()
defer b.feedsMtx.RUnlock()

f, ok := b.feeds[feedID]
if !ok {
// Feed must have been closed in another thread.
return
}

dur, _ := time.ParseDuration(durStr)
cache.candleMtx.RLock()
cdls := cache.CandlesCopy()
cache.candleMtx.RUnlock()

payload := CandlesPayload{
Dur: durStr,
DurMilliSecs: uint64(dur.Milliseconds()),
Candles: cdls,
}
encPayload, encErr := json.Marshal(payload)
if encErr != nil {
err = fmt.Errorf("candles: Failed to marshal payload: %+v, err: %v", payload, encErr)
return
}

f.c <- &BookUpdate{
Action: FreshCandlesAction,
Host: b.dc.acct.host,
MarketID: marketName(b.base, b.quote),
Payload: &CandlesPayload{
Dur: durStr,
DurMilliSecs: uint64(dur.Milliseconds()),
Candles: cdls,
},
Payload: encPayload,
}
}()
if atomic.LoadUint32(&cache.on) == 1 {
Expand Down Expand Up @@ -395,6 +418,16 @@ func (dc *dexConnection) syncBook(base, quote uint32) (*orderbook.OrderBook, Boo
cfg := dc.cfg
dc.cfgMtx.RUnlock()

// bookie is reflecting client's point of view on the state of server order
// book. First bookie syncs order book, then it keeps order book state
// consistent with server's by applying sequential updates on top of the
// data it got during sync. bookie can't apply these order book updates
// while sync is in progress, and applying updates isn't enough - bookie
// must relay those updates through his feed to the interested consumers,
// we lock dc.booksMtx here to prevent those updates from being applied
// until bookie is ready, see more on that below.
// And we need to serialize map entry usage by concurrent actors here so
// that we won't be trying to create 2 bookies when we really need 2 feeds.
dc.booksMtx.Lock()
defer dc.booksMtx.Unlock()

Expand All @@ -419,19 +452,31 @@ func (dc *dexConnection) syncBook(base, quote uint32) (*orderbook.OrderBook, Boo
dc.books[mktID] = booky
}

// Get the feed and the book under a single lock to make sure the first
// message is the book.
payload := MarketOrderBook{
Base: base,
Quote: quote,
Book: booky.book(),
}
encPayload, err := json.Marshal(payload)
if err != nil {
return nil, nil, fmt.Errorf("syncBook: Failed to marshal payload: %+v, err: %v", payload, err)
}
feed := booky.newFeed(&BookUpdate{
Action: FreshBookAction,
Host: dc.acct.host,
MarketID: mktID,
Payload: &MarketOrderBook{
Base: base,
Quote: quote,
Book: booky.book(),
},
Payload: encPayload,
})

// bookie is not considered initialized until it has at least 1 feed, if we
// release dc.booksMtx before that we will receive and apply those order book
// updates mentioned above on top of the order book state we got during initial
// bookie sync, but we won't be able to relay them as feed to the consumer who
// initiated this function in the first place. So, now the feed is initialized,
// it's safe to release this mutex (done with defer).
// Same reasoning applies when we are extending existing bookie with 2nd, 3rd ...
// feed.

return booky.OrderBook, feed, nil
}

Expand Down Expand Up @@ -615,11 +660,17 @@ func handleBookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error
if err != nil {
return err
}

payload := book.minifyOrder(note.OrderID, &note.TradeNote, 0)
encPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("handleBookOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err)
}
book.send(&BookUpdate{
Action: BookOrderAction,
Host: dc.acct.host,
MarketID: note.MarketID,
Payload: book.minifyOrder(note.OrderID, &note.TradeNote, 0),
Payload: encPayload,
})
return nil
}
Expand Down Expand Up @@ -743,16 +794,21 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
}
dc.tradeMtx.RUnlock()

payload := MarketOrderBook{
Base: mkt.Base,
Quote: mkt.Quote,
Book: book.book(), // empty
}
encPayload, encErr := json.Marshal(payload)
if encErr != nil {
return fmt.Errorf("handleTradeSuspensionMsg: Failed to marshal payload: %+v, err: %v", payload, err)
}
// Clear the book.
book.send(&BookUpdate{
Action: FreshBookAction,
Host: dc.acct.host,
MarketID: sp.MarketID,
Payload: &MarketOrderBook{
Base: mkt.Base,
Quote: mkt.Quote,
Book: book.book(), // empty
},
Payload: encPayload,
})

if len(updatedAssets) > 0 {
Expand Down Expand Up @@ -1007,11 +1063,17 @@ func handleUnbookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
if err != nil {
return err
}

payload := MiniOrder{Token: token(note.OrderID)}
encPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("handleUnbookOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err)
}
book.send(&BookUpdate{
Action: UnbookOrderAction,
Host: dc.acct.host,
MarketID: note.MarketID,
Payload: &MiniOrder{Token: token(note.OrderID)},
Payload: encPayload,
})

return nil
Expand All @@ -1035,15 +1097,21 @@ func handleUpdateRemainingMsg(_ *Core, dc *dexConnection, msg *msgjson.Message)
if err != nil {
return err
}

payload := RemainderUpdate{
Token: token(note.OrderID),
Qty: float64(note.Remaining) / float64(book.baseUnits.Conventional.ConversionFactor),
QtyAtomic: note.Remaining,
}
encPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("handleUpdateRemainingMsg: Failed to marshal payload: %+v, err: %v", payload, err)
}
book.send(&BookUpdate{
Action: UpdateRemainingAction,
Host: dc.acct.host,
MarketID: note.MarketID,
Payload: &RemainderUpdate{
Token: token(note.OrderID),
Qty: float64(note.Remaining) / float64(book.baseUnits.Conventional.ConversionFactor),
QtyAtomic: note.Remaining,
},
Payload: encPayload,
})
return nil
}
Expand Down Expand Up @@ -1087,12 +1155,17 @@ func handleEpochOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error
return fmt.Errorf("failed to Enqueue epoch order: %w", err)
}

payload := book.minifyOrder(note.OrderID, &note.TradeNote, note.Epoch)
encPayload, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("handleEpochOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err)
}
// Send a MiniOrder for book updates.
book.send(&BookUpdate{
Action: EpochOrderAction,
Host: dc.acct.host,
MarketID: note.MarketID,
Payload: book.minifyOrder(note.OrderID, &note.TradeNote, note.Epoch),
Payload: encPayload,
})

return nil
Expand Down
32 changes: 25 additions & 7 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/binary"
"encoding/csv"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -134,6 +135,12 @@ type dexConnection struct {
cfgMtx sync.RWMutex
cfg *msgjson.ConfigResult

// booksMtx is used for both, protecting access to books map so that
// concurrent modifications are serialized and to synchronize certain
// actions performed on a particular bookie in that map (such as
// preventing applying any order book updates until order book snapshot
// is taken by another go-routine, so that it can get a feed on those
// changes to keep the original snapshot in sync).
booksMtx sync.RWMutex
books map[string]*bookie

Expand Down Expand Up @@ -7894,8 +7901,14 @@ func (c *Core) handleReconnect(host string) {
}

resubMkt := func(mkt *market) {
// Locate any bookie for this market.
booky := dc.bookie(mkt.name)
// Locking on dc.booksMtx is currently the only way to ensure the "book"
// notification gets sent first, before we start applying any update
// notifications coming from server (that can potentially happen once
// booky.Reset func returns).
dc.booksMtx.Lock()
defer dc.booksMtx.Unlock()

booky := dc.books[mkt.name]
if booky == nil {
// Was not previously subscribed with the server for this market.
return
Expand All @@ -7915,16 +7928,21 @@ func (c *Core) handleReconnect(host string) {
c.log.Errorf("handleReconnect: Failed to Sync market %q order book snapshot: %v", mkt.name, err)
}

payload := MarketOrderBook{
Base: mkt.base,
Quote: mkt.quote,
Book: booky.book(),
}
encPayload, err := json.Marshal(payload)
if err != nil {
c.log.Errorf("handleReconnect: Failed to marshal payload: %+v, err: %v", payload, err)
}
// Send a FreshBookAction to the subscribers.
booky.send(&BookUpdate{
Action: FreshBookAction,
Host: dc.acct.host,
MarketID: mkt.name,
Payload: &MarketOrderBook{
Base: mkt.base,
Quote: mkt.quote,
Book: booky.book(),
},
Payload: encPayload,
})
}

Expand Down
9 changes: 5 additions & 4 deletions client/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package core
import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -701,10 +702,10 @@ const (

// BookUpdate is an order book update.
type BookUpdate struct {
Action string `json:"action"`
Host string `json:"host"`
MarketID string `json:"marketID"`
Payload interface{} `json:"payload"`
Action string `json:"action"`
Host string `json:"host"`
MarketID string `json:"marketID"`
Payload json.RawMessage `json:"payload"`
}

type CandlesPayload struct {
Expand Down
Loading