Skip to content

Commit

Permalink
auth,book,market: unbook mia user orders
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Oct 7, 2020
1 parent fe1898f commit 10f42f1
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 48 deletions.
77 changes: 50 additions & 27 deletions server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,25 @@ func (client *clientInfo) respHandler(id uint64) *respHandler {
// signing messages with the DEX's private key. AuthManager manages requests to
// the 'connect' route.
type AuthManager struct {
anarchy bool
freeCancels bool
banScore uint32
cancelThresh float64
storage Storage
signer Signer
regFee uint64
checkFee FeeChecker
feeConfs int64
anarchy bool
freeCancels bool
banScore uint32
cancelThresh float64
storage Storage
signer Signer
regFee uint64
checkFee FeeChecker
feeConfs int64
miaUserTimeout time.Duration
unbookFun func(account.AccountID)

// latencyQ is a queue for fee coin waiters to deal with latency.
latencyQ *wait.TickerQueue

connMtx sync.RWMutex
users map[account.AccountID]*clientInfo
conns map[uint64]*clientInfo
connMtx sync.RWMutex
users map[account.AccountID]*clientInfo
conns map[uint64]*clientInfo
unbookers map[account.AccountID]*time.Timer

violationMtx sync.Mutex
matchOutcomes map[account.AccountID]*latest
Expand Down Expand Up @@ -257,16 +260,6 @@ func (v Violation) String() string {
return violations[v].desc
}

type violationCounter map[Violation]uint32

func (vc violationCounter) score() int32 {
var score int32
for step, count := range vc {
score += step.Score() * int32(count)
}
return score
}

// NoActionStep is the action that the user failed to take. This is used to
// define valid inputs to the Inaction method.
type NoActionStep uint8
Expand Down Expand Up @@ -318,6 +311,11 @@ type Config struct {
FeeConfs int64
// FeeChecker is a method for getting the registration fee output info.
FeeChecker FeeChecker
// UserUnbooker is a function for unbooking all of a user's orders.
UserUnbooker func(account.AccountID)
// MiaUserTimeout is how after a user disconnects until UserUnbooker is
// called for that user.
MiaUserTimeout time.Duration

CancelThreshold float64
Anarchy bool
Expand All @@ -339,6 +337,9 @@ func NewAuthManager(cfg *Config) *AuthManager {
banScore: banScore,
users: make(map[account.AccountID]*clientInfo),
conns: make(map[uint64]*clientInfo),
unbookers: make(map[account.AccountID]*time.Timer),
miaUserTimeout: cfg.MiaUserTimeout,
unbookFun: cfg.UserUnbooker,
storage: cfg.Storage,
signer: cfg.Signer,
regFee: cfg.RegistrationFee,
Expand Down Expand Up @@ -416,6 +417,12 @@ func (auth *AuthManager) recordOrderDone(user account.AccountID, oid order.Order
func (auth *AuthManager) Run(ctx context.Context) {
go auth.latencyQ.Run(ctx)
<-ctx.Done()
auth.connMtx.Lock()
defer auth.connMtx.Unlock()
for user, ub := range auth.unbookers {
ub.Stop()
delete(auth.unbookers, user)
}
// TODO: wait for latencyQ and running comms route handlers (handleRegister, handleNotifyFee)!
}

Expand Down Expand Up @@ -784,24 +791,40 @@ func (auth *AuthManager) conn(conn comms.Link) *clientInfo {
return auth.conns[conn.ID()]
}

// addClient adds the client to the users and conns maps.
// addClient adds the client to the users and conns maps, and stops any unbook
// timers started when they last disconnected.
func (auth *AuthManager) addClient(client *clientInfo) {
auth.connMtx.Lock()
defer auth.connMtx.Unlock()
oldClient := auth.users[client.acct.ID]
user := client.acct.ID
if unbookTimer, found := auth.unbookers[user]; found {
unbookTimer.Stop()
delete(auth.unbookers, user)
}
oldClient := auth.users[user]
if oldClient != nil {
oldClient.conn.Disconnect()
}
auth.users[client.acct.ID] = client
auth.users[user] = client
auth.conns[client.conn.ID()] = client
}

// removeClient removes the client from the users and conns map.
// removeClient removes the client from the users and conns map, and sets a
// timer to unbook all of the user's orders if they do not return within a
// certain time.
func (auth *AuthManager) removeClient(client *clientInfo) {
auth.connMtx.Lock()
defer auth.connMtx.Unlock()
delete(auth.users, client.acct.ID)
user := client.acct.ID
delete(auth.users, user)
delete(auth.conns, client.conn.ID())
auth.unbookers[user] = time.AfterFunc(auth.miaUserTimeout, func() {
log.Infof("Unbooking all orders for MIA user %v", user)
auth.unbookFun(user)
auth.connMtx.Lock()
delete(auth.unbookers, user)
auth.connMtx.Unlock()
})
}

// loadUserScore computes the user's current score from order and swap data
Expand Down
2 changes: 2 additions & 0 deletions server/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func TestMain(m *testing.M) {
RegistrationFee: tRegFee,
FeeConfs: tCheckFeeConfs,
FeeChecker: tCheckFee,
UserUnbooker: func(account.AccountID) {},
MiaUserTimeout: 90 * time.Second, // TODO: test
CancelThreshold: 0.9,
})
go authMgr.Run(ctx)
Expand Down
6 changes: 6 additions & 0 deletions server/book/book.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (b *Book) Remove(oid order.OrderID) (*order.LimitOrder, bool) {
return nil, false
}

// RemoveUserOrders removes all ords from the book that belong to a user. The
// OrderIDs of the removed buy and sell orders are returned.
func (b *Book) RemoveUserOrders(user account.AccountID) (removedBuys, removedSells []*order.LimitOrder) {
return b.buys.RemoveUserOrders(user), b.sells.RemoveUserOrders(user)
}

// HaveOrder checks if an order is in either the buy or sell side of the book.
func (b *Book) HaveOrder(oid order.OrderID) bool {
b.mtx.RLock()
Expand Down
18 changes: 18 additions & 0 deletions server/book/orderpq.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,24 @@ func (pq *OrderPQ) RemoveOrderID(oid order.OrderID) (*order.LimitOrder, bool) {
return pq.removeOrder(pq.orders[oid])
}

// RemoveUserOrders removes all orders from the queue that belong to a user.
func (pq *OrderPQ) RemoveUserOrders(user account.AccountID) (removed []*order.LimitOrder) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
uos, found := pq.userOrders[user]
if !found {
return
}
delete(pq.userOrders, user) // save some work from removeOrder=>Pop

removed = make([]*order.LimitOrder, 0, len(uos))
for oid, lo := range uos {
pq.removeOrder(pq.orders[oid])
removed = append(removed, lo)
}
return
}

// HaveOrder indicates if an order is in the queue.
func (pq *OrderPQ) HaveOrder(oid order.OrderID) bool {
return pq.Order(oid) != nil
Expand Down
91 changes: 86 additions & 5 deletions server/book/orderpq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/server/account"
)

type Order = order.LimitOrder
Expand All @@ -21,6 +22,11 @@ var (
}
)

func randomAccount() (user account.AccountID) {
rand.Read(user[:])
return
}

func newFakeAddr() string {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, 35)
Expand All @@ -35,9 +41,6 @@ func genBigList(listSize int) {
if bigList != nil {
return
}
// var b [8]byte
// crand.Read(b[:])
// seed := int64(binary.LittleEndian.Uint64(b[:]))
seed := int64(-3405439173988651889)
rand.Seed(seed)

Expand Down Expand Up @@ -563,6 +566,86 @@ func TestOrderPriorityQueue_Remove(t *testing.T) {
}
}

func TestOrderPriorityQueue_RemoveUserOrders(t *testing.T) {
startLogger()

pq := NewMaxOrderPQ(6)

ok := pq.Insert(orders[0])
if !ok {
t.Errorf("Failed to insert order %v", orders[0])
}

ok = pq.Insert(orders[1])
if !ok {
t.Errorf("Failed to insert order %v", orders[1])
}

user0 := orders[0].AccountID

user1 := randomAccount()
other := newLimitOrder(false, 42000000, 2, order.StandingTiF, 0)
other.AccountID = user1
ok = pq.Insert(other)
if !ok {
t.Errorf("Failed to insert order %v", other)
}

amt, count := pq.UserOrderTotals(user0)
wantAmt := orders[0].Remaining() + orders[1].Remaining()
if amt != wantAmt {
t.Errorf("wanted %d remaining, got %d", wantAmt, amt)
}
if count != 2 {
t.Errorf("wanted %d orders, got %d", 2, count)
}

amt, count = pq.UserOrderTotals(user1)
wantAmt = other.Remaining()
if amt != wantAmt {
t.Errorf("wanted %d remaining, got %d", wantAmt, amt)
}
if count != 1 {
t.Errorf("wanted %d orders, got %d", 1, count)
}

removed := pq.RemoveUserOrders(user0)
if pq.Len() != 1 {
t.Errorf("Queue length expected %d, got %d", 1, pq.Len())
}
if len(removed) != 2 {
t.Fatalf("removed %d orders, expected %d", len(removed), 2)
}
for _, oid := range []order.OrderID{orders[0].ID(), orders[1].ID()} {
var found bool
for i := range removed {
if oid == removed[i].ID() {
found = true
break
}
}
if !found {
t.Errorf("didn't remove order %v", oid)
}
}

remainingID := pq.PeekBest().ID()
if remainingID != other.ID() {
t.Errorf("Remaining element expected %s, got %s", other.ID(),
remainingID)
}
removed = pq.RemoveUserOrders(user1)
if remain := pq.Len(); remain != 0 {
t.Errorf("didn't remove all orders, still have %d", remain)
}
if len(removed) != 1 {
t.Fatalf("removed %d orders, expected %d", len(removed), 1)
}
if removed[0].ID() != other.ID() {
t.Errorf("removed order %v, expected %v", removed[0], other.ID())
}
}

func TestOrderPQMin_Worst(t *testing.T) {
startLogger()

Expand Down Expand Up @@ -643,8 +726,6 @@ func TestOrderPQMax_Worst(t *testing.T) {
return bigList[i].Price() < bigList[j].Price()
})

//t.Log(bigList[0].Price(), bigList[len(bigList)-1].Price(), pq.PeekBest().Price())

// Worst for a min queue is highest rate.
worst := pq.Worst()
if worst.UID() != bigList[len(bigList)-1].UID() {
Expand Down
4 changes: 4 additions & 0 deletions server/cmd/dcrdex/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
defaultRegFeeConfirms = 4
defaultRegFeeAmount = 1e8
defaultBroadcastTimeout = 5 * time.Minute
defaultMiaUserTimeout = 90 * time.Second
)

var (
Expand Down Expand Up @@ -85,6 +86,7 @@ type dexConf struct {
FreeCancels bool
MaxUserCancels uint32
BanScore uint32
MiaUserTimeout time.Duration
DEXPrivKeyPath string
RPCCert string
RPCKey string
Expand Down Expand Up @@ -129,6 +131,7 @@ type flagsData struct {
FreeCancels bool `long:"freecancels" description:"No cancellation rate enforcement (unlimited cancel orders). Implied by --anarchy."`
MaxUserCancels uint32 `long:"maxepochcancels" description:"The maximum number of cancel orders allowed for a user in a given epoch."`
BanScore uint32 `long:"banscore" description:"The accumulated penalty score at which when an account gets closed."`
MiaUserTimeout time.Duration `long:"miatimeout" description:"How long until a user's orders are automatically unbooked after they are known to be disconnected."`
DEXPrivKeyPath string `long:"dexprivkeypath" description:"The path to a file containing the DEX private key for message signing."`

HTTPProfile bool `long:"httpprof" short:"p" description:"Start HTTP profiler."`
Expand Down Expand Up @@ -308,6 +311,7 @@ func loadConfig() (*dexConf, *procOpts, error) {
CancelThreshold: defaultCancelThresh,
MaxUserCancels: defaultMaxUserCancels,
BanScore: defaultBanScore,
MiaUserTimeout: defaultMiaUserTimeout,
}

// Pre-parse the command line options to see if an alternative config file
Expand Down
1 change: 1 addition & 0 deletions server/cmd/dcrdex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func mainCore(ctx context.Context) error {
Anarchy: cfg.Anarchy,
FreeCancels: cfg.FreeCancels,
BanScore: cfg.BanScore,
MiaUserTimeout: cfg.MiaUserTimeout,
DEXPrivKey: privKey,
CommsCfg: &dexsrv.RPCConfig{
RPCCert: cfg.RPCCert,
Expand Down
Loading

0 comments on commit 10f42f1

Please sign in to comment.