diff --git a/client/core/account.go b/client/core/account.go index 60794c9605..17fe52e13f 100644 --- a/client/core/account.go +++ b/client/core/account.go @@ -233,7 +233,7 @@ func (c *Core) UpdateCert(host string, cert []byte) error { } // Stop reconnect retry for previous dex connection first but leave it in - // the map so it remains listed incase we need it in the interim. + // the map so it remains listed in case we need it in the interim. if found { dc.connMaster.Disconnect() dc.acct.lock() diff --git a/client/core/bookie.go b/client/core/bookie.go index 7f9dea9750..74d81832ef 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -4,6 +4,7 @@ package core import ( + "encoding/json" "errors" "fmt" "sync" @@ -83,14 +84,14 @@ func (c *candleCache) init(in []*msgjson.Candle) { // addCandle adds the candle using candles.Cache.Add. It returns most recent candle // in cache. -func (c *candleCache) addCandle(msgCandle *msgjson.Candle) (recent msgjson.Candle, ok bool) { +func (c *candleCache) addCandle(msgCandle *msgjson.Candle) *msgjson.Candle { if atomic.LoadUint32(&c.on) == 0 { - return msgjson.Candle{}, false + return nil } c.candleMtx.Lock() defer c.candleMtx.Unlock() c.Add(msgCandle) - return *c.Last(), true + return c.Last() } // bookie is a BookFeed manager. bookie will maintain any number of order book @@ -185,36 +186,47 @@ func (b *bookie) logEpochReport(note *msgjson.EpochReportNote) error { marketID := marketName(b.base, b.quote) matchSummaries := b.AddRecentMatches(note.MatchSummary, note.EndStamp) if len(note.MatchSummary) > 0 { + encPayload, err := json.Marshal(matchSummaries) + if err != nil { + return fmt.Errorf("logEpochReport: Failed to marshal payload: %+v, err: %v", matchSummaries, err) + } b.send(&BookUpdate{ Action: EpochMatchSummary, MarketID: marketID, - Payload: matchSummaries, + Payload: encPayload, }) } for durStr, cache := range b.candleCaches { - c, ok := cache.addCandle(¬e.Candle) - if !ok { + c := cache.addCandle(¬e.Candle) + if c == nil { continue } + dur, _ := time.ParseDuration(durStr) + payload := CandleUpdate{ + Dur: durStr, + DurMilliSecs: uint64(dur.Milliseconds()), + Candle: c, + } + encPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("logEpochReport: Failed to marshal payload: %+v, err: %v", payload, err) + } b.send(&BookUpdate{ Action: CandleUpdateAction, Host: b.dc.acct.host, MarketID: marketID, - Payload: CandleUpdate{ - Dur: durStr, - DurMilliSecs: uint64(dur.Milliseconds()), - // Providing a copy of msgjson.Candle data here since it will be used concurrently. - Candle: &c, - }, + Payload: encPayload, }) } return nil } -// newFeed gets a new *bookFeed and cancels the close timer. feed must be called -// with the bookie.mtx locked. The feed is primed with the provided *BookUpdate. +// newFeed gets a new *bookFeed and cancels the close timer. The feed is primed +// with the provided *BookUpdate. +// newFeed must be called with dexConnection.booksMtx locked, see its description +// for details on why. func (b *bookie) newFeed(u *BookUpdate) *bookFeed { b.timerMtx.Lock() if b.closeTimer != nil { @@ -264,26 +276,37 @@ func (b *bookie) candles(durStr string, feedID uint32) error { if err != nil { return } + b.feedsMtx.RLock() defer b.feedsMtx.RUnlock() + f, ok := b.feeds[feedID] if !ok { // Feed must have been closed in another thread. return } + dur, _ := time.ParseDuration(durStr) cache.candleMtx.RLock() cdls := cache.CandlesCopy() cache.candleMtx.RUnlock() + + payload := CandlesPayload{ + Dur: durStr, + DurMilliSecs: uint64(dur.Milliseconds()), + Candles: cdls, + } + encPayload, encErr := json.Marshal(payload) + if encErr != nil { + err = fmt.Errorf("candles: Failed to marshal payload: %+v, err: %v", payload, encErr) + return + } + f.c <- &BookUpdate{ Action: FreshCandlesAction, Host: b.dc.acct.host, MarketID: marketName(b.base, b.quote), - Payload: &CandlesPayload{ - Dur: durStr, - DurMilliSecs: uint64(dur.Milliseconds()), - Candles: cdls, - }, + Payload: encPayload, } }() if atomic.LoadUint32(&cache.on) == 1 { @@ -395,6 +418,16 @@ func (dc *dexConnection) syncBook(base, quote uint32) (*orderbook.OrderBook, Boo cfg := dc.cfg dc.cfgMtx.RUnlock() + // bookie is reflecting client's point of view on the state of server order + // book. First bookie syncs order book, then it keeps order book state + // consistent with server's by applying sequential updates on top of the + // data it got during sync. bookie can't apply these order book updates + // while sync is in progress, and applying updates isn't enough - bookie + // must relay those updates through his feed to the interested consumers, + // we lock dc.booksMtx here to prevent those updates from being applied + // until bookie is ready, see more on that below. + // And we need to serialize map entry usage by concurrent actors here so + // that we won't be trying to create 2 bookies when we really need 2 feeds. dc.booksMtx.Lock() defer dc.booksMtx.Unlock() @@ -419,19 +452,31 @@ func (dc *dexConnection) syncBook(base, quote uint32) (*orderbook.OrderBook, Boo dc.books[mktID] = booky } - // Get the feed and the book under a single lock to make sure the first - // message is the book. + payload := MarketOrderBook{ + Base: base, + Quote: quote, + Book: booky.book(), + } + encPayload, err := json.Marshal(payload) + if err != nil { + return nil, nil, fmt.Errorf("syncBook: Failed to marshal payload: %+v, err: %v", payload, err) + } feed := booky.newFeed(&BookUpdate{ Action: FreshBookAction, Host: dc.acct.host, MarketID: mktID, - Payload: &MarketOrderBook{ - Base: base, - Quote: quote, - Book: booky.book(), - }, + Payload: encPayload, }) + // bookie is not considered initialized until it has at least 1 feed, if we + // release dc.booksMtx before that we will receive and apply those order book + // updates mentioned above on top of the order book state we got during initial + // bookie sync, but we won't be able to relay them as feed to the consumer who + // initiated this function in the first place. So, now the feed is initialized, + // it's safe to release this mutex (done with defer). + // Same reasoning applies when we are extending existing bookie with 2nd, 3rd ... + // feed. + return booky.OrderBook, feed, nil } @@ -615,11 +660,17 @@ func handleBookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error if err != nil { return err } + + payload := book.minifyOrder(note.OrderID, ¬e.TradeNote, 0) + encPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("handleBookOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err) + } book.send(&BookUpdate{ Action: BookOrderAction, Host: dc.acct.host, MarketID: note.MarketID, - Payload: book.minifyOrder(note.OrderID, ¬e.TradeNote, 0), + Payload: encPayload, }) return nil } @@ -743,16 +794,21 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } dc.tradeMtx.RUnlock() + payload := MarketOrderBook{ + Base: mkt.Base, + Quote: mkt.Quote, + Book: book.book(), // empty + } + encPayload, encErr := json.Marshal(payload) + if encErr != nil { + return fmt.Errorf("handleTradeSuspensionMsg: Failed to marshal payload: %+v, err: %v", payload, err) + } // Clear the book. book.send(&BookUpdate{ Action: FreshBookAction, Host: dc.acct.host, MarketID: sp.MarketID, - Payload: &MarketOrderBook{ - Base: mkt.Base, - Quote: mkt.Quote, - Book: book.book(), // empty - }, + Payload: encPayload, }) if len(updatedAssets) > 0 { @@ -1007,11 +1063,17 @@ func handleUnbookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro if err != nil { return err } + + payload := MiniOrder{Token: token(note.OrderID)} + encPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("handleUnbookOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err) + } book.send(&BookUpdate{ Action: UnbookOrderAction, Host: dc.acct.host, MarketID: note.MarketID, - Payload: &MiniOrder{Token: token(note.OrderID)}, + Payload: encPayload, }) return nil @@ -1035,15 +1097,21 @@ func handleUpdateRemainingMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) if err != nil { return err } + + payload := RemainderUpdate{ + Token: token(note.OrderID), + Qty: float64(note.Remaining) / float64(book.baseUnits.Conventional.ConversionFactor), + QtyAtomic: note.Remaining, + } + encPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("handleUpdateRemainingMsg: Failed to marshal payload: %+v, err: %v", payload, err) + } book.send(&BookUpdate{ Action: UpdateRemainingAction, Host: dc.acct.host, MarketID: note.MarketID, - Payload: &RemainderUpdate{ - Token: token(note.OrderID), - Qty: float64(note.Remaining) / float64(book.baseUnits.Conventional.ConversionFactor), - QtyAtomic: note.Remaining, - }, + Payload: encPayload, }) return nil } @@ -1087,12 +1155,17 @@ func handleEpochOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error return fmt.Errorf("failed to Enqueue epoch order: %w", err) } + payload := book.minifyOrder(note.OrderID, ¬e.TradeNote, note.Epoch) + encPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("handleEpochOrderMsg: Failed to marshal payload: %+v, err: %v", payload, err) + } // Send a MiniOrder for book updates. book.send(&BookUpdate{ Action: EpochOrderAction, Host: dc.acct.host, MarketID: note.MarketID, - Payload: book.minifyOrder(note.OrderID, ¬e.TradeNote, note.Epoch), + Payload: encPayload, }) return nil diff --git a/client/core/core.go b/client/core/core.go index 84b3222e46..10ca266daf 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -10,6 +10,7 @@ import ( "encoding/binary" "encoding/csv" "encoding/hex" + "encoding/json" "errors" "fmt" "math" @@ -134,6 +135,12 @@ type dexConnection struct { cfgMtx sync.RWMutex cfg *msgjson.ConfigResult + // booksMtx is used for both, protecting access to books map so that + // concurrent modifications are serialized and to synchronize certain + // actions performed on a particular bookie in that map (such as + // preventing applying any order book updates until order book snapshot + // is taken by another go-routine, so that it can get a feed on those + // changes to keep the original snapshot in sync). booksMtx sync.RWMutex books map[string]*bookie @@ -7894,8 +7901,14 @@ func (c *Core) handleReconnect(host string) { } resubMkt := func(mkt *market) { - // Locate any bookie for this market. - booky := dc.bookie(mkt.name) + // Locking on dc.booksMtx is currently the only way to ensure the "book" + // notification gets sent first, before we start applying any update + // notifications coming from server (that can potentially happen once + // booky.Reset func returns). + dc.booksMtx.Lock() + defer dc.booksMtx.Unlock() + + booky := dc.books[mkt.name] if booky == nil { // Was not previously subscribed with the server for this market. return @@ -7915,16 +7928,21 @@ func (c *Core) handleReconnect(host string) { c.log.Errorf("handleReconnect: Failed to Sync market %q order book snapshot: %v", mkt.name, err) } + payload := MarketOrderBook{ + Base: mkt.base, + Quote: mkt.quote, + Book: booky.book(), + } + encPayload, err := json.Marshal(payload) + if err != nil { + c.log.Errorf("handleReconnect: Failed to marshal payload: %+v, err: %v", payload, err) + } // Send a FreshBookAction to the subscribers. booky.send(&BookUpdate{ Action: FreshBookAction, Host: dc.acct.host, MarketID: mkt.name, - Payload: &MarketOrderBook{ - Base: mkt.base, - Quote: mkt.quote, - Book: booky.book(), - }, + Payload: encPayload, }) } diff --git a/client/core/types.go b/client/core/types.go index 990799b7c5..9d8bb00839 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -6,6 +6,7 @@ package core import ( "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "math" "strings" @@ -701,10 +702,10 @@ const ( // BookUpdate is an order book update. type BookUpdate struct { - Action string `json:"action"` - Host string `json:"host"` - MarketID string `json:"marketID"` - Payload interface{} `json:"payload"` + Action string `json:"action"` + Host string `json:"host"` + MarketID string `json:"marketID"` + Payload json.RawMessage `json:"payload"` } type CandlesPayload struct { diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index c111244844..5b8e0bb7be 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -224,7 +224,7 @@ func (ob *OrderBook) processCachedNotes() error { } // Sync updates a client tracked order book with an order book snapshot. It is -// an error if the the OrderBook is already synced. +// an error if the OrderBook is already synced. func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { if ob.isSynced() { return fmt.Errorf("order book is already synced") diff --git a/client/webserver/live_test.go b/client/webserver/live_test.go index d0ec6e0ec1..2e561f38d7 100644 --- a/client/webserver/live_test.go +++ b/client/webserver/live_test.go @@ -10,6 +10,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "math" "math/rand" @@ -981,11 +982,16 @@ func (c *TCore) SyncBook(dexAddr string, base, quote uint32) (core.BookFeed, err side = c.sells } side[ord.Token] = ord + payload := ord + encPayload, err := json.Marshal(payload) + if err != nil { + panic(fmt.Errorf("SyncBook: Failed to marshal payload: %+v, err: %v", payload, err)) + } epochOrder := &core.BookUpdate{ Action: msgjson.EpochOrderRoute, Host: c.dexAddr, MarketID: mktID, - Payload: ord, + Payload: encPayload, } c.trySend(epochOrder) c.epochOrders = append(c.epochOrders, epochOrder) @@ -1014,11 +1020,16 @@ func (c *TCore) SyncBook(dexAddr string, base, quote uint32) (core.BookFeed, err delete(side, tkn) c.orderMtx.Unlock() + payload := core.MiniOrder{Token: tkn} + encPayload, err := json.Marshal(payload) + if err != nil { + panic(fmt.Errorf("SyncBook: Failed to marshal payload: %+v, err: %v", payload, err)) + } c.trySend(&core.BookUpdate{ Action: msgjson.UnbookOrderRoute, Host: c.dexAddr, MarketID: mktID, - Payload: &core.MiniOrder{Token: tkn}, + Payload: encPayload, }) } @@ -1030,15 +1041,20 @@ func (c *TCore) SyncBook(dexAddr string, base, quote uint32) (core.BookFeed, err if dur == 0 { continue } + payload := core.CandleUpdate{ + Dur: durStr, + DurMilliSecs: uint64(dur.Milliseconds()), + Candle: candle(mkt, dur, time.Now()), + } + encPayload, err := json.Marshal(payload) + if err != nil { + panic(fmt.Errorf("SyncBook: Failed to marshal payload: %+v, err: %v", payload, err)) + } c.trySend(&core.BookUpdate{ Action: core.CandleUpdateAction, Host: dexAddr, MarketID: mktID, - Payload: &core.CandleUpdate{ - Dur: durStr, - DurMilliSecs: uint64(dur.Milliseconds()), - Candle: candle(mkt, dur, time.Now()), - }, + Payload: encPayload, }) case <-ctx.Done(): @@ -1048,15 +1064,20 @@ func (c *TCore) SyncBook(dexAddr string, base, quote uint32) (core.BookFeed, err } }() + payload := core.MarketOrderBook{ + Base: base, + Quote: quote, + Book: c.book(dexAddr, mktID), + } + encPayload, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("SyncBook: Failed to marshal payload: %+v, err: %v", payload, err) + } c.bookFeed.c <- &core.BookUpdate{ Action: core.FreshBookAction, Host: dexAddr, MarketID: mktID, - Payload: &core.MarketOrderBook{ - Base: base, - Quote: quote, - Book: c.book(dexAddr, mktID), - }, + Payload: encPayload, } return c.bookFeed, nil @@ -1128,15 +1149,20 @@ func (c *TCore) sendCandles(durStr string) { iStartTime = iStartTime.Add(dur) } + payload := core.CandlesPayload{ + Dur: durStr, + DurMilliSecs: uint64(dur.Milliseconds()), + Candles: candles, + } + encPayload, err := json.Marshal(payload) + if err != nil { + panic(fmt.Errorf("SyncBook: Failed to marshal payload: %+v, err: %v", payload, err)) + } c.bookFeed.c <- &core.BookUpdate{ Action: core.FreshCandlesAction, Host: dexAddr, MarketID: mktID, - Payload: &core.CandlesPayload{ - Dur: durStr, - DurMilliSecs: uint64(dur.Milliseconds()), - Candles: candles, - }, + Payload: encPayload, } } @@ -1805,15 +1831,19 @@ out: c.orderMtx.Lock() // Send limit orders as newly booked. for _, o := range c.epochOrders { - miniOrder := o.Payload.(*core.MiniOrder) + var miniOrder core.MiniOrder + err := json.Unmarshal(o.Payload, &miniOrder) + if err != nil { + panic(fmt.Errorf("runEpochs: Failed to marshal payload: %+v, err: %v", o.Payload, err)) + } if miniOrder.Rate > 0 { miniOrder.Epoch = 0 o.Action = msgjson.BookOrderRoute c.trySend(o) if miniOrder.Sell { - c.sells[miniOrder.Token] = miniOrder + c.sells[miniOrder.Token] = &miniOrder } else { - c.buys[miniOrder.Token] = miniOrder + c.buys[miniOrder.Token] = &miniOrder } } } diff --git a/dex/msgjson/types.go b/dex/msgjson/types.go index df21664872..d0ae90a337 100644 --- a/dex/msgjson/types.go +++ b/dex/msgjson/types.go @@ -365,7 +365,7 @@ func NewRequest(id uint64, route string, payload interface{}) (*Message, error) } return &Message{ Type: Request, - Payload: json.RawMessage(encoded), + Payload: encoded, Route: route, ID: id, }, nil @@ -390,7 +390,7 @@ func NewResponse(id uint64, result interface{}, rpcErr *Error) (*Message, error) } return &Message{ Type: Response, - Payload: json.RawMessage(encResp), + Payload: encResp, ID: id, }, nil } @@ -425,7 +425,7 @@ func NewNotification(route string, payload interface{}) (*Message, error) { return &Message{ Type: Notification, Route: route, - Payload: json.RawMessage(encPayload), + Payload: encPayload, }, nil } diff --git a/server/market/bookrouter.go b/server/market/bookrouter.go index c82b54b447..b296b24626 100644 --- a/server/market/bookrouter.go +++ b/server/market/bookrouter.go @@ -540,7 +540,7 @@ func (r *BookRouter) Book(mktName string) (*msgjson.OrderBook, error) { return msgOB, nil } -// sendBook encodes and sends the the entire order book to the specified client. +// sendBook encodes and sends the entire order book to the specified client. func (r *BookRouter) sendBook(conn comms.Link, book *msgBook, msgID uint64) { msgOB := r.msgOrderBook(book) if msgOB == nil { diff --git a/server/market/market.go b/server/market/market.go index ed8cfb3151..889c32db43 100644 --- a/server/market/market.go +++ b/server/market/market.go @@ -698,7 +698,7 @@ func (m *Market) OrderFeed() <-chan *updateSignal { // FeedDone informs the market that the caller is finished receiving from the // given channel, which should have been obtained from OrderFeed. If the channel // was a registered order feed channel from OrderFeed, it is closed and removed -// so that no further signals will be send on the channel. +// so that no further signals will be sent on the channel. func (m *Market) FeedDone(feed <-chan *updateSignal) bool { m.orderFeedMtx.Lock() defer m.orderFeedMtx.Unlock()