Skip to content

Commit

Permalink
try to reuse the info.Queue conversion has a negative performance effect
Browse files Browse the repository at this point in the history
  • Loading branch information
jameinel committed Jun 6, 2017
1 parent 1edd031 commit 2ecd4fc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
60 changes: 43 additions & 17 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,41 @@ const preloadBatchSize = 100

func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error {
seen[t.Id] = t
// we shouldn't need this one anymore because we are processing it now
delete(preloaded, t.Id)
err := f.advance(t, nil, false)
if err != errPreReqs {
return err
}
toPreload := make([]bson.ObjectId, 0)
for _, dkey := range t.docKeys() {
remaining := make([]bson.ObjectId, 0, len(f.queue[dkey]))
toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey]))
queue := f.queue[dkey]
remaining := make([]bson.ObjectId, 0, len(queue))
for _, dtt := range f.queue[dkey] {
id := dtt.id()
if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil {
if seen[id] != nil {
continue
}
toPreload[id] = struct{}{}
remaining = append(remaining, id)
}
// done with this map
toPreload = nil

for len(remaining) > 0 {
batch := remaining
if len(batch) > preloadBatchSize {
batch = remaining[:preloadBatchSize]
toPreload = toPreload[:0]
batchSize := preloadBatchSize
if batchSize > len(remaining) {
batchSize = len(remaining)
}
remaining = remaining[len(batch):]
err := f.loadMulti(batch, preloaded)
if err != nil {
return err
batch := remaining[:batchSize]
remaining = remaining[batchSize:]
for _, id := range batch {
if preloaded[id] == nil {
toPreload = append(toPreload, id)
}
}
if len(toPreload) > 0 {
if err := f.loadMulti(toPreload, preloaded); err != nil {
return err
}
}
for _, id := range batch {
if seen[id] != nil {
Expand Down Expand Up @@ -302,6 +310,8 @@ NextDoc:
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
// We updated the Q, so this should force refresh
// TODO: We could *just* add the new txn-queue entry/reuse existing tokens
f.queue[dkey] = tokensWithIds(info.Queue)
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
Expand Down Expand Up @@ -363,8 +373,14 @@ NextDoc:
} else {
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = tokensWithIds(info.Queue)
existRevno, rok := revno[dkey]
existQ, qok := f.queue[dkey]
if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) {
// We've already loaded this doc, no need to load it again
} else {
revno[dkey] = info.Revno
f.queue[dkey] = tokensWithIds(info.Queue)
}
continue NextDoc
}
}
Expand Down Expand Up @@ -498,15 +514,25 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
goto RetryDoc
}
revno[dkey] = info.Revno

// TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2)
// if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different
// structure (map) to do faster lookups to see if the tokens are already present.
found := false
for _, id := range info.Queue {
if id == tt {
found = true
break
}
}
f.queue[dkey] = tokensWithIds(info.Queue)
// f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue)
existQ, qok := f.queue[dkey]
if qok && len(existQ) == len(info.Queue) {
// we could check that info.Q matches existQ.tt
} else {
if len(existQ) != len(info.Queue) {
}
f.queue[dkey] = tokensWithIds(info.Queue)
}
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
Expand Down
9 changes: 6 additions & 3 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ func newNonce() string {

type token string

func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) }
func (tt token) nonce() string { return string(tt[25:]) }
func (tt token) id() bson.ObjectId {
return bson.ObjectIdHex(string(tt[:24]))
}
func (tt token) nonce() string { return string(tt[25:]) }

// Op represents an operation to a single document that may be
// applied as part of a transaction with other operations.
Expand Down Expand Up @@ -330,6 +332,8 @@ func (r *Runner) ResumeAll() (err error) {
panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
}
}
// TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with
// any error it might encounter (db connection closed, etc.)
return nil
}

Expand Down Expand Up @@ -478,7 +482,6 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra
return nil
}


type typeNature int

const (
Expand Down

0 comments on commit 2ecd4fc

Please sign in to comment.