From 2491579f54601f035c2840b5f2520ab674531f5d Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 15:52:40 +0400 Subject: [PATCH 1/4] Include preloading of a bunch of transactions. During 'recurse' loading all of the transactions to be done one-by-one is actually rather expensive. Instead we can load them ahead of time, and even allow the database to load them in whatever order is optimal for the db. --- txn/flusher.go | 28 +++++++++++++++++++++++----- txn/txn.go | 26 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index 64b06c3ec..e2a0ec1df 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -62,7 +62,8 @@ func (f *flusher) run() (err error) { f.debugf("Processing %s", f.goal) seen := make(map[bson.ObjectId]*transaction) - if err := f.recurse(f.goal, seen); err != nil { + preloaded := make(map[bson.ObjectId]*transaction) + if err := f.recurse(f.goal, seen, preloaded); err != nil { return err } if f.goal.done() { @@ -154,23 +155,40 @@ func (f *flusher) run() (err error) { return nil } -func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error { +func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { + toPreload := make(map[bson.ObjectId]struct{}) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } - qt, err := f.load(id) + toPreload[id] = struct{}{} + } + if len(toPreload) > 0 { + err := f.loadMulti(toPreload, preloaded) if err != nil { return err } - err = f.recurse(qt, seen) + } + for _, dtt := range f.queue[dkey] { + id := dtt.id() + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) if err != nil { return err } diff --git a/txn/txn.go b/txn/txn.go index 204b3cf1d..79a99b483 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -459,6 +459,32 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } +func (r *Runner) loadMulti(toLoad map[bson.ObjectId]struct{}, preloaded map[bson.ObjectId]*transaction) error { + ids := make([]bson.ObjectId, len(toLoad)) + i := 0 + for id, _ := range toLoad { + ids[i] = id + i++ + } + txns := make([]transaction, 0, len(ids)) + + query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) + // Not sure that this actually has much of an effect when using All() + query.Batch(len(ids)) + err := query.All(&txns) + if err == mgo.ErrNotFound { + return fmt.Errorf("could not find a transaction in batch: %v", ids) + } else if err != nil { + return err + } + for i := range txns { + t := &txns[i] + preloaded[t.Id] = t + } + return nil +} + + type typeNature int const ( From 924d95b4737601abcb01e3df8242b6fa1c5dab91 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 16:13:29 +0400 Subject: [PATCH 2/4] Batch the preload into chunks. When dealing with some forms of 'setup', the existing preload loads too much data and causes a different O(N^2) behavior. So instead, we cap the number of transactions we will preload, which gives an upper bound on how much we'll over-load. --- txn/flusher.go | 43 +++++++++++++++++++++++++++---------------- txn/txn.go | 8 +------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index e2a0ec1df..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -155,43 +155,54 @@ func (f *flusher) run() (err error) { return nil } +const preloadBatchSize = 100 + func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { - toPreload := make(map[bson.ObjectId]struct{}) + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } toPreload[id] = struct{}{} + remaining = append(remaining, id) } - if len(toPreload) > 0 { - err := f.loadMulti(toPreload, preloaded) + // done with this map + toPreload = nil + for len(remaining) > 0 { + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] + } + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) if err != nil { return err } - } - for _, dtt := range f.queue[dkey] { - id := dtt.id() - if seen[id] != nil { - continue - } - qt, ok := preloaded[id] - if !ok { - qt, err = f.load(id) + for _, id := range batch { + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) if err != nil { return err } } - err = f.recurse(qt, seen, preloaded) - if err != nil { - return err - } } } return nil diff --git a/txn/txn.go b/txn/txn.go index 79a99b483..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -459,13 +459,7 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } -func (r *Runner) loadMulti(toLoad map[bson.ObjectId]struct{}, preloaded map[bson.ObjectId]*transaction) error { - ids := make([]bson.ObjectId, len(toLoad)) - i := 0 - for id, _ := range toLoad { - ids[i] = id - i++ - } +func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error { txns := make([]transaction, 0, len(ids)) query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) From a3e83d631d6acb2e091dd70c3eb04eaef2aafb4c Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 17:13:34 +0400 Subject: [PATCH 3/4] try to reuse the info.Queue conversion has a negative performance effect --- txn/flusher.go | 60 ++++++++++++++++++++++++++++++++++++-------------- txn/txn.go | 9 +++++--- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index d668da8d7..eb9449003 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -159,33 +159,41 @@ const preloadBatchSize = 100 func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + // we shouldn't need this one anymore because we are processing it now delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } + toPreload := make([]bson.ObjectId, 0) for _, dkey := range t.docKeys() { - remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) - toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) + queue := f.queue[dkey] + remaining := make([]bson.ObjectId, 0, len(queue)) for _, dtt := range f.queue[dkey] { id := dtt.id() - if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { + if seen[id] != nil { continue } - toPreload[id] = struct{}{} remaining = append(remaining, id) } - // done with this map - toPreload = nil + for len(remaining) > 0 { - batch := remaining - if len(batch) > preloadBatchSize { - batch = remaining[:preloadBatchSize] + toPreload = toPreload[:0] + batchSize := preloadBatchSize + if batchSize > len(remaining) { + batchSize = len(remaining) } - remaining = remaining[len(batch):] - err := f.loadMulti(batch, preloaded) - if err != nil { - return err + batch := remaining[:batchSize] + remaining = remaining[batchSize:] + for _, id := range batch { + if preloaded[id] == nil { + toPreload = append(toPreload, id) + } + } + if len(toPreload) > 0 { + if err := f.loadMulti(toPreload, preloaded); err != nil { + return err + } } for _, id := range batch { if seen[id] != nil { @@ -302,6 +310,8 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno + // We updated the Q, so this should force refresh + // TODO: We could *just* add the new txn-queue entry/reuse existing tokens f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc @@ -363,8 +373,14 @@ NextDoc: } else { f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } - revno[dkey] = info.Revno - f.queue[dkey] = tokensWithIds(info.Queue) + existRevno, rok := revno[dkey] + existQ, qok := f.queue[dkey] + if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) { + // We've already loaded this doc, no need to load it again + } else { + revno[dkey] = info.Revno + f.queue[dkey] = tokensWithIds(info.Queue) + } continue NextDoc } } @@ -498,7 +514,9 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) goto RetryDoc } revno[dkey] = info.Revno - + // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2) + // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different + // structure (map) to do faster lookups to see if the tokens are already present. found := false for _, id := range info.Queue { if id == tt { @@ -506,7 +524,15 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = tokensWithIds(info.Queue) + // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue) + existQ, qok := f.queue[dkey] + if qok && len(existQ) == len(info.Queue) { + // we could check that info.Q matches existQ.tt + } else { + if len(existQ) != len(info.Queue) { + } + f.queue[dkey] = tokensWithIds(info.Queue) + } if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: diff --git a/txn/txn.go b/txn/txn.go index 3bc6e640f..9aefdeb7a 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -136,8 +136,10 @@ func newNonce() string { type token string -func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } -func (tt token) nonce() string { return string(tt[25:]) } +func (tt token) id() bson.ObjectId { + return bson.ObjectIdHex(string(tt[:24])) +} +func (tt token) nonce() string { return string(tt[25:]) } // Op represents an operation to a single document that may be // applied as part of a transaction with other operations. @@ -330,6 +332,8 @@ func (r *Runner) ResumeAll() (err error) { panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) } } + // TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with + // any error it might encounter (db connection closed, etc.) return nil } @@ -478,7 +482,6 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra return nil } - type typeNature int const ( From b5ff82716196cde52f71ebf48c317378030a3fb7 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 17:13:38 +0400 Subject: [PATCH 4/4] Revert "try to reuse the info.Queue conversion has a negative performance effect" This reverts commit 2ecd4fc22d369a17cb6e5b2e5240afdf87f19476. --- txn/flusher.go | 60 ++++++++++++++------------------------------------ txn/txn.go | 9 +++----- 2 files changed, 20 insertions(+), 49 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index eb9449003..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -159,41 +159,33 @@ const preloadBatchSize = 100 func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t - // we shouldn't need this one anymore because we are processing it now delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } - toPreload := make([]bson.ObjectId, 0) for _, dkey := range t.docKeys() { - queue := f.queue[dkey] - remaining := make([]bson.ObjectId, 0, len(queue)) + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } + toPreload[id] = struct{}{} remaining = append(remaining, id) } - + // done with this map + toPreload = nil for len(remaining) > 0 { - toPreload = toPreload[:0] - batchSize := preloadBatchSize - if batchSize > len(remaining) { - batchSize = len(remaining) - } - batch := remaining[:batchSize] - remaining = remaining[batchSize:] - for _, id := range batch { - if preloaded[id] == nil { - toPreload = append(toPreload, id) - } + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] } - if len(toPreload) > 0 { - if err := f.loadMulti(toPreload, preloaded); err != nil { - return err - } + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) + if err != nil { + return err } for _, id := range batch { if seen[id] != nil { @@ -310,8 +302,6 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - // We updated the Q, so this should force refresh - // TODO: We could *just* add the new txn-queue entry/reuse existing tokens f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc @@ -373,14 +363,8 @@ NextDoc: } else { f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } - existRevno, rok := revno[dkey] - existQ, qok := f.queue[dkey] - if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) { - // We've already loaded this doc, no need to load it again - } else { - revno[dkey] = info.Revno - f.queue[dkey] = tokensWithIds(info.Queue) - } + revno[dkey] = info.Revno + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -514,9 +498,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) goto RetryDoc } revno[dkey] = info.Revno - // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2) - // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different - // structure (map) to do faster lookups to see if the tokens are already present. + found := false for _, id := range info.Queue { if id == tt { @@ -524,15 +506,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue) - existQ, qok := f.queue[dkey] - if qok && len(existQ) == len(info.Queue) { - // we could check that info.Q matches existQ.tt - } else { - if len(existQ) != len(info.Queue) { - } - f.queue[dkey] = tokensWithIds(info.Queue) - } + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: diff --git a/txn/txn.go b/txn/txn.go index 9aefdeb7a..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -136,10 +136,8 @@ func newNonce() string { type token string -func (tt token) id() bson.ObjectId { - return bson.ObjectIdHex(string(tt[:24])) -} -func (tt token) nonce() string { return string(tt[25:]) } +func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } +func (tt token) nonce() string { return string(tt[25:]) } // Op represents an operation to a single document that may be // applied as part of a transaction with other operations. @@ -332,8 +330,6 @@ func (r *Runner) ResumeAll() (err error) { panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) } } - // TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with - // any error it might encounter (db connection closed, etc.) return nil } @@ -482,6 +478,7 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra return nil } + type typeNature int const (