Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/msgBook: atomic seq updates with respect to order book contents #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 50 additions & 49 deletions server/market/bookrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/msgjson"
Expand Down Expand Up @@ -150,7 +151,6 @@ type BookSource interface {
type subscribers struct {
mtx sync.RWMutex
conns map[uint64]comms.Link
seq uint64
}

// add adds a new subscriber.
Expand All @@ -171,37 +171,33 @@ func (s *subscribers) remove(id uint64) bool {
return true
}

// nextSeq gets the next sequence number by incrementing the counter. This
// should be used when the book and orders are modified. Currently this applies
// to the routes: book_order, unbook_order, update_remaining, and epoch_order,
// plus suspend if the book is also being purged (persist=false).
func (s *subscribers) nextSeq() uint64 {
s.mtx.Lock()
defer s.mtx.Unlock()
s.seq++
return s.seq
}

// lastSeq gets the last retrieved sequence number.
func (s *subscribers) lastSeq() uint64 {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.seq
}

// msgBook is a local copy of the order book information. The orders are saved
// as msgjson.BookOrderNote structures.
type msgBook struct {
name string
// mtx guards orders and epochIdx
mtx sync.RWMutex
running bool
running atomic.Bool // whether msgBook is initialized
name string
subs *subscribers
source BookSource
baseID uint32
quoteID uint32

// mtx ensures orders, epochIdx and seq are changed atomically with respect
// to each other.
mtx sync.RWMutex
// seq is tracking current(latest) order book version. See nextSeq for info
// on what version change actually means.
seq uint64
orders map[order.OrderID]*msgjson.BookOrderNote
epochIdx int64
subs *subscribers
source BookSource
baseID uint32
quoteID uint32
}

// nextSeq gets the next sequence number by incrementing the latest book version
// counter. This should be used when the book and orders are modified. Currently,
// this applies to these routes: book_order, unbook_order, update_remaining, and
// epoch_order, plus suspend if the book is also being purged (persist=false).
func (book *msgBook) nextSeq() uint64 {
book.seq++
return book.seq
}

func (book *msgBook) setEpoch(idx int64) {
Expand All @@ -219,7 +215,7 @@ func (book *msgBook) epoch() int64 {
// insert adds the information for a new order into the order book. If the order
// is already found, it is inserted, but an error is logged since update should
// be used in that case.
func (book *msgBook) insert(lo *order.LimitOrder) *msgjson.BookOrderNote {
func (book *msgBook) insert(lo *order.LimitOrder) (note *msgjson.BookOrderNote) {
msgOrder := limitOrderToMsgOrder(lo, book.name)
book.mtx.Lock()
defer book.mtx.Unlock()
Expand All @@ -229,6 +225,7 @@ func (book *msgBook) insert(lo *order.LimitOrder) *msgjson.BookOrderNote {
//panic("bad insert")
}
book.orders[lo.ID()] = msgOrder
msgOrder.Seq = book.nextSeq()
return msgOrder
}

Expand All @@ -245,14 +242,16 @@ func (book *msgBook) update(lo *order.LimitOrder) *msgjson.BookOrderNote {
//panic("bad update")
}
book.orders[lo.ID()] = msgOrder
msgOrder.Seq = book.nextSeq()
return msgOrder
}

// Remove the order from the order book.
func (book *msgBook) remove(lo *order.LimitOrder) {
func (book *msgBook) remove(lo *order.LimitOrder) (nextSeq uint64) {
book.mtx.Lock()
defer book.mtx.Unlock()
delete(book.orders, lo.ID())
return book.nextSeq()
}

// addBulkOrders adds the lists of orders to the order book, and records the
Expand All @@ -261,6 +260,7 @@ func (book *msgBook) addBulkOrders(epoch int64, orderSets ...[]*order.LimitOrder
book.mtx.Lock()
defer book.mtx.Unlock()
book.epochIdx = epoch
// book.seq starts with 0 here.
for _, set := range orderSets {
for _, lo := range set {
book.orders[lo.ID()] = limitOrderToMsgOrder(lo, book.name)
Expand Down Expand Up @@ -333,20 +333,17 @@ func (r *BookRouter) runBook(ctx context.Context, book *msgBook) {
// Get the initial book.
feed := book.source.OrderFeed()
book.addBulkOrders(book.source.Book())
book.running.Store(true) // can serve order book to clients now
subs := book.subs

defer func() {
book.running.Store(false) // can stop serving order book to clients now
book.mtx.Lock()
book.running = false
book.orders = make(map[order.OrderID]*msgjson.BookOrderNote)
book.mtx.Unlock()
log.Infof("Book router terminating for market %q", book.name)
}()

book.mtx.Lock()
book.running = true
book.mtx.Unlock()

out:
for {
select {
Expand Down Expand Up @@ -376,20 +373,18 @@ out:
if !ok {
panic("non-limit order received with bookAction")
}
n := book.insert(lo)
n.Seq = subs.nextSeq()
note = n
note = book.insert(lo)

case sigDataUnbookedOrder:
route = msgjson.UnbookOrderRoute
lo, ok := sigData.order.(*order.LimitOrder)
if !ok {
panic("non-limit order received with unbookAction")
}
book.remove(lo)
nextSeq := book.remove(lo)
oid := sigData.order.ID()
note = &msgjson.UnbookOrderNote{
Seq: subs.nextSeq(),
Seq: nextSeq,
MarketID: book.name,
OrderID: oid[:],
}
Expand All @@ -405,7 +400,6 @@ out:
OrderNote: bookNote.OrderNote,
Remaining: lo.Remaining(),
}
n.Seq = subs.nextSeq()
note = n

case sigDataEpochReport:
Expand Down Expand Up @@ -449,7 +443,9 @@ out:
epochNote.TargetID = o.TargetOrderID[:]
}

epochNote.Seq = subs.nextSeq()
book.mtx.Lock() // book snapshot can be taken concurrently
epochNote.Seq = book.nextSeq()
book.mtx.Unlock()
epochNote.MarketID = book.name
epochNote.Epoch = uint64(sigData.epochIdx)
c := sigData.order.Commitment()
Expand Down Expand Up @@ -490,9 +486,9 @@ out:
}
// Only set Seq if there is a book update.
if !sigData.persistBook {
susp.Seq = subs.nextSeq() // book purge
book.mtx.Lock()
book.orders = make(map[order.OrderID]*msgjson.BookOrderNote)
book.mtx.Lock() // book snapshot can be taken concurrently
book.seq = book.nextSeq()
book.orders = make(map[order.OrderID]*msgjson.BookOrderNote) // book purge
book.mtx.Unlock()
// The router is "running" although the market is suspended.
}
Expand Down Expand Up @@ -559,21 +555,26 @@ func (r *BookRouter) sendBook(conn comms.Link, book *msgBook, msgID uint64) {
}
}

// msgOrderBook returns current (latest) order book snapshot.
func (r *BookRouter) msgOrderBook(book *msgBook) *msgjson.OrderBook {
book.mtx.RLock() // book.orders and book.running
if !book.running {
book.mtx.RUnlock()
// Don't want to block client requests for too long, so return fast if order book
// isn't initialized yet instead of proceeding to wait on mutex below for who
// knows how long.
if !book.running.Load() {
return nil
}

book.mtx.RLock()
defer book.mtx.RUnlock()

ords := make([]*msgjson.BookOrderNote, 0, len(book.orders))
for _, o := range book.orders {
ords = append(ords, o)
}
epochIdx := book.epochIdx // instead of book.epoch() while already locked
book.mtx.RUnlock()

return &msgjson.OrderBook{
Seq: book.subs.lastSeq(),
Seq: book.seq,
MarketID: book.name,
Epoch: uint64(epochIdx),
Orders: ords,
Expand Down