diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index a7a9c659cfb..de8f61593b5 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -16,6 +16,7 @@ package frontier import ( "bytes" "fmt" + "math" "strings" "github.com/pingcap/ticdc/pkg/regionspan" @@ -28,35 +29,14 @@ type Frontier interface { String() string } -type node struct { - start []byte - end []byte - ts uint64 - - // used by list - nexts []*node - - // used by heap - left *node - right *node - children *node - parent *node - rank int - marked bool -} - -func (s *node) String() string { - return fmt.Sprintf("[{%s %s} @ %d]", s.start, s.end, s.ts) -} - // spanFrontier tracks the minimum timestamp of a set of spans. type spanFrontier struct { - list spanList - minHeap minTsHeap + spanList skipList + minTsHeap fibonacciHeap } // NewFrontier creates Froniter from the given spans. -func NewFrontier(spans ...regionspan.Span) Frontier { +func NewFrontier(checkpointTs uint64, spans ...regionspan.Span) Frontier { // spanFrontier don't support use Nil as the maximum key of End range // So we use set it as util.UpperBoundKey, the means the real use case *should not* have a // End key bigger than util.UpperBoundKey @@ -64,18 +44,18 @@ func NewFrontier(spans ...regionspan.Span) Frontier { spans[i] = span.Hack() } - s := &spanFrontier{} - s.list.init() - + s := &spanFrontier{ + spanList: *newSpanList(), + } + firstSpan := true for _, span := range spans { - e := &node{ - start: span.Start, - end: span.End, - ts: 0, + if firstSpan { + s.spanList.Insert(span.Start, s.minTsHeap.Insert(checkpointTs)) + s.spanList.Insert(span.End, s.minTsHeap.Insert(math.MaxUint64)) + firstSpan = false + continue } - - s.list.insert(e) - s.minHeap.insert(e) + s.insert(span, checkpointTs) } return s @@ -83,11 +63,7 @@ func NewFrontier(spans ...regionspan.Span) Frontier { // Frontier return the minimum tiemstamp. func (s *spanFrontier) Frontier() uint64 { - min := s.minHeap.getMin() - if min == nil { - return 0 - } - return min.ts + return s.minTsHeap.GetMinKey() } // Forward advances the timestamp for a span. @@ -97,70 +73,67 @@ func (s *spanFrontier) Forward(span regionspan.Span, ts uint64) { } func (s *spanFrontier) insert(span regionspan.Span, ts uint64) { - n := s.list.seek(span.Start) - - if n == nil || bytes.Compare(span.End, n.start) <= 0 { - // No overlapped spans. - return - } - - if bytes.Compare(n.end, span.Start) <= 0 { - // Pointed span has no overlap, step to next. - n = n.next() - } - - if n != nil && bytes.Compare(span.Start, n.start) > 0 { - e := &node{ - start: n.start, - end: span.Start, - ts: n.ts, + seekRes := s.spanList.Seek(span.Start) + + // if there is no change in the region span + // We just need to update the ts corresponding to the span in list + next := seekRes.Node().Next() + if next != nil { + cmpStart := bytes.Compare(seekRes.Node().Key(), span.Start) + cmpEnd := bytes.Compare(next.Key(), span.End) + if cmpStart == 0 && cmpEnd == 0 { + s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) + return } - n.start = span.Start - s.list.insert(e) - s.minHeap.insert(e) } - for n != nil { - oldTs := n.ts - if ts != n.ts { - s.minHeap.updateTs(n, ts) - } - - next := n.next() - if next != nil && bytes.Compare(next.start, span.End) < 0 { - n = next + // regions are merged or split, overwrite span into list + node := seekRes.Node() + lastNodeTs := uint64(math.MaxUint64) + shouldInsertStartNode := true + if node.Value() != nil { + lastNodeTs = node.Value().key + } + for ; node != nil; node = node.Next() { + cmpStart := bytes.Compare(node.Key(), span.Start) + if cmpStart < 0 { continue } - - // no more overlapped spans. - if cmp := bytes.Compare(span.End, n.end); cmp < 0 { - e := &node{ - start: span.End, - end: n.end, - ts: oldTs, - } - s.list.insert(e) - n.end = span.End - s.minHeap.insert(e) + if bytes.Compare(node.Key(), span.End) > 0 { + break + } + lastNodeTs = node.Value().key + if cmpStart == 0 { + s.minTsHeap.UpdateKey(node.Value(), ts) + shouldInsertStartNode = false + } else { + s.spanList.Remove(seekRes, node) + s.minTsHeap.Remove(node.Value()) } - return } + if shouldInsertStartNode { + s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) + seekRes.Next() + } + s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs)) } // Entries visit all traced spans. -func (s *spanFrontier) Entries(fn func(start, end []byte, ts uint64)) { - for n := s.list.head.next(); n != nil; n = n.next() { - fn(n.start, n.end, n.ts) - } +func (s *spanFrontier) Entries(fn func(key []byte, ts uint64)) { + s.spanList.Entries(func(n *skipListNode) bool { + fn(n.Key(), n.Value().key) + return true + }) } func (s *spanFrontier) String() string { var buf strings.Builder - for n := s.list.head.next(); n != nil; n = n.next() { - if buf.Len() != 0 { - buf.WriteString(` `) + s.Entries(func(key []byte, ts uint64) { + if ts == math.MaxUint64 { + buf.WriteString(fmt.Sprintf("[%s @ Max] ", key)) + } else { + buf.WriteString(fmt.Sprintf("[%s @ %d] ", key, ts)) } - buf.WriteString(n.String()) - } + }) return buf.String() } diff --git a/cdc/puller/frontier/frontier_bench_test.go b/cdc/puller/frontier/frontier_bench_test.go index 5c1ea1e433e..3a51eeb48c3 100644 --- a/cdc/puller/frontier/frontier_bench_test.go +++ b/cdc/puller/frontier/frontier_bench_test.go @@ -49,7 +49,7 @@ func BenchmarkSpanFrontier(b *testing.B) { spans = append(spans, span) } - f := NewFrontier(spans...) + f := NewFrontier(0, spans...) b.ResetTimer() @@ -91,7 +91,7 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) { }) } - f := NewFrontier(spans...) + f := NewFrontier(0, spans...) b.ResetTimer() diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 6ca46912e5d..f1d7ae5d0f1 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -14,8 +14,9 @@ package frontier import ( - "fmt" - "strings" + "bytes" + "math/rand" + "sort" "testing" "github.com/pingcap/check" @@ -28,18 +29,6 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&spanFrontierSuite{}) -func (s *spanFrontier) testStr() string { - var buf strings.Builder - s.Entries(func(start, end []byte, ts uint64) { - if buf.Len() != 0 { - buf.WriteString(` `) - } - fmt.Fprintf(&buf, `{%s %s}@%d`, start, end, ts) - }) - - return buf.String() -} - func (s *spanFrontierSuite) TestSpanFrontier(c *check.C) { keyA := []byte("a") keyB := []byte("b") @@ -53,18 +42,27 @@ func (s *spanFrontierSuite) TestSpanFrontier(c *check.C) { spBD := regionspan.Span{Start: keyB, End: keyD} spCD := regionspan.Span{Start: keyC, End: keyD} - f := NewFrontier(spAD).(*spanFrontier) + f := NewFrontier(5, spAD).(*spanFrontier) - c.Assert(f.Frontier(), check.Equals, uint64(0)) - c.Assert(f.testStr(), check.Equals, `{a d}@0`) + c.Assert(f.Frontier(), check.Equals, uint64(5)) + c.Assert(f.String(), check.Equals, `[a @ 5] [d @ Max] `) + checkFrontier(c, f) - // Untracked spans are ignored f.Forward( regionspan.Span{Start: []byte("d"), End: []byte("e")}, 100, ) - c.Assert(f.Frontier(), check.Equals, uint64(0)) - c.Assert(f.testStr(), check.Equals, `{a d}@0`) + c.Assert(f.Frontier(), check.Equals, uint64(5)) + c.Assert(f.String(), check.Equals, `[a @ 5] [d @ 100] [e @ Max] `) + checkFrontier(c, f) + + f.Forward( + regionspan.Span{Start: []byte("g"), End: []byte("h")}, + 200, + ) + c.Assert(f.Frontier(), check.Equals, uint64(5)) + c.Assert(f.String(), check.Equals, `[a @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward the tracked span space. f.Forward( @@ -72,68 +70,96 @@ func (s *spanFrontierSuite) TestSpanFrontier(c *check.C) { 1, ) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a d}@1`) + c.Assert(f.String(), check.Equals, `[a @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) - // Forward it again + // // Forward it again f.Forward( regionspan.Span{Start: []byte("a"), End: []byte("d")}, 2, ) c.Assert(f.Frontier(), check.Equals, uint64(2)) - c.Assert(f.testStr(), check.Equals, `{a d}@2`) + c.Assert(f.String(), check.Equals, `[a @ 2] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) - // Forward to smaller ts + // // Forward to smaller ts f.Forward( regionspan.Span{Start: []byte("a"), End: []byte("d")}, 1, ) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a d}@1`) + c.Assert(f.String(), check.Equals, `[a @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) - // Forward b-c + // // Forward b-c f.Forward(spBC, 3) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a b}@1 {b c}@3 {c d}@1`) + c.Assert(f.String(), check.Equals, `[a @ 1] [b @ 3] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward b-c more to be 4 f.Forward(spBC, 4) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a b}@1 {b c}@4 {c d}@1`) + c.Assert(f.String(), check.Equals, `[a @ 1] [b @ 4] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward all to at least 3 f.Forward(spAD, 3) c.Assert(f.Frontier(), check.Equals, uint64(3)) - c.Assert(f.testStr(), check.Equals, `{a b}@3 {b c}@3 {c d}@3`) + c.Assert(f.String(), check.Equals, `[a @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward AB and CD to be 5, keep BC at 4 f.Forward(spAB, 5) c.Assert(f.Frontier(), check.Equals, uint64(3)) + c.Assert(f.String(), check.Equals, `[a @ 5] [b @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) + f.Forward(spCD, 5) c.Assert(f.Frontier(), check.Equals, uint64(3)) - c.Assert(f.testStr(), check.Equals, `{a b}@5 {b c}@3 {c d}@5`) + c.Assert(f.String(), check.Equals, `[a @ 5] [b @ 3] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Catch BC to be 5 too f.Forward(spBC, 5) c.Assert(f.Frontier(), check.Equals, uint64(5)) - c.Assert(f.testStr(), check.Equals, `{a b}@5 {b c}@5 {c d}@5`) + c.Assert(f.String(), check.Equals, `[a @ 5] [b @ 5] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward all to be 6 f.Forward(spAD, 6) c.Assert(f.Frontier(), check.Equals, uint64(6)) - c.Assert(f.testStr(), check.Equals, `{a b}@6 {b c}@6 {c d}@6`) + c.Assert(f.String(), check.Equals, `[a @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) // Forward ac to 7 f.Forward(spAC, 7) c.Assert(f.Frontier(), check.Equals, uint64(6)) - c.Assert(f.testStr(), check.Equals, `{a b}@7 {b c}@7 {c d}@6`) + c.Assert(f.String(), check.Equals, `[a @ 7] [c @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) + // Forward bd to 8 f.Forward(spBD, 8) c.Assert(f.Frontier(), check.Equals, uint64(7)) - c.Assert(f.testStr(), check.Equals, `{a b}@7 {b c}@8 {c d}@8`) + c.Assert(f.String(), check.Equals, `[a @ 7] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) + // Forward ab to 8 f.Forward(spAB, 8) c.Assert(f.Frontier(), check.Equals, uint64(8)) - c.Assert(f.testStr(), check.Equals, `{a b}@8 {b c}@8 {c d}@8`) + c.Assert(f.String(), check.Equals, `[a @ 8] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `) + checkFrontier(c, f) + + f.Forward(regionspan.Span{Start: []byte("1"), End: []byte("g")}, 9) + c.Assert(f.Frontier(), check.Equals, uint64(9)) + c.Assert(f.String(), check.Equals, `[1 @ 9] [g @ 200] [h @ Max] `) + checkFrontier(c, f) + + f.Forward(regionspan.Span{Start: []byte("g"), End: []byte("i")}, 10) + c.Assert(f.Frontier(), check.Equals, uint64(9)) + c.Assert(f.String(), check.Equals, `[1 @ 9] [g @ 10] [i @ Max] `) + checkFrontier(c, f) + } func (s *spanFrontierSuite) TestMinMax(c *check.C) { @@ -145,21 +171,30 @@ func (s *spanFrontierSuite) TestMinMax(c *check.C) { spMidMax := regionspan.Span{Start: keyMid, End: keyMax} spMinMax := regionspan.Span{Start: keyMin, End: keyMax} - f := NewFrontier(spMinMax).(*spanFrontier) + f := NewFrontier(0, spMinMax) c.Assert(f.Frontier(), check.Equals, uint64(0)) - c.Assert(f.testStr(), check.Equals, "{ \xff\xff\xff\xff\xff}@0") + c.Assert(f.String(), check.Equals, "[ @ 0] [\xff\xff\xff\xff\xff @ Max] ") + checkFrontier(c, f) f.Forward(spMinMax, 1) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, "{ \xff\xff\xff\xff\xff}@1") + c.Assert(f.String(), check.Equals, "[ @ 1] [\xff\xff\xff\xff\xff @ Max] ") + checkFrontier(c, f) f.Forward(spMinMid, 2) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, "{ m}@2 {m \xff\xff\xff\xff\xff}@1") + c.Assert(f.String(), check.Equals, "[ @ 2] [m @ 1] [\xff\xff\xff\xff\xff @ Max] ") + checkFrontier(c, f) f.Forward(spMidMax, 2) c.Assert(f.Frontier(), check.Equals, uint64(2)) - c.Assert(f.testStr(), check.Equals, "{ m}@2 {m \xff\xff\xff\xff\xff}@2") + c.Assert(f.String(), check.Equals, "[ @ 2] [m @ 2] [\xff\xff\xff\xff\xff @ Max] ") + checkFrontier(c, f) + + f.Forward(spMinMax, 3) + c.Assert(f.Frontier(), check.Equals, uint64(3)) + c.Assert(f.String(), check.Equals, "[ @ 3] [\xff\xff\xff\xff\xff @ Max] ") + checkFrontier(c, f) } func (s *spanFrontierSuite) TestSpanFrontierDisjoinSpans(c *check.C) { @@ -180,40 +215,96 @@ func (s *spanFrontierSuite) TestSpanFrontierDisjoinSpans(c *check.C) { sp12 := regionspan.Span{Start: key1, End: key2} sp1F := regionspan.Span{Start: key1, End: keyF} - f := NewFrontier(spAB, spCE).(*spanFrontier) + f := NewFrontier(0, spAB, spCE) c.Assert(f.Frontier(), check.Equals, uint64(0)) - c.Assert(f.testStr(), check.Equals, `{a b}@0 {c e}@0`) + c.Assert(f.String(), check.Equals, `[a @ 0] [b @ Max] [c @ 0] [e @ Max] `) + checkFrontier(c, f) // Advance the tracked spans f.Forward(spAB, 1) c.Assert(f.Frontier(), check.Equals, uint64(0)) - c.Assert(f.testStr(), check.Equals, `{a b}@1 {c e}@0`) + c.Assert(f.String(), check.Equals, `[a @ 1] [b @ Max] [c @ 0] [e @ Max] `) + checkFrontier(c, f) f.Forward(spCE, 1) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a b}@1 {c e}@1`) + c.Assert(f.String(), check.Equals, `[a @ 1] [b @ Max] [c @ 1] [e @ Max] `) + checkFrontier(c, f) // Advance d-e split c-e to c-d and d-e f.Forward(spDE, 2) c.Assert(f.Frontier(), check.Equals, uint64(1)) - c.Assert(f.testStr(), check.Equals, `{a b}@1 {c d}@1 {d e}@2`) + c.Assert(f.String(), check.Equals, `[a @ 1] [b @ Max] [c @ 1] [d @ 2] [e @ Max] `) + checkFrontier(c, f) // Advance a-d cover a-b and c-d f.Forward(spAD, 3) c.Assert(f.Frontier(), check.Equals, uint64(2)) - c.Assert(f.testStr(), check.Equals, `{a b}@3 {c d}@3 {d e}@2`) + c.Assert(f.String(), check.Equals, `[a @ 3] [d @ 2] [e @ Max] `) + checkFrontier(c, f) // Advance one cover all 3 span f.Forward(spAE, 4) c.Assert(f.Frontier(), check.Equals, uint64(4)) - c.Assert(f.testStr(), check.Equals, `{a b}@4 {c d}@4 {d e}@4`) + c.Assert(f.String(), check.Equals, `[a @ 4] [e @ Max] `) + checkFrontier(c, f) // Advance all with a larger span f.Forward(sp1F, 5) c.Assert(f.Frontier(), check.Equals, uint64(5)) - c.Assert(f.testStr(), check.Equals, `{a b}@5 {c d}@5 {d e}@5`) + c.Assert(f.String(), check.Equals, `[1 @ 5] [f @ Max] `) + checkFrontier(c, f) // Advance span smaller than all tracked spans f.Forward(sp12, 6) c.Assert(f.Frontier(), check.Equals, uint64(5)) - c.Assert(f.testStr(), check.Equals, `{a b}@5 {c d}@5 {d e}@5`) + c.Assert(f.String(), check.Equals, `[1 @ 6] [2 @ 5] [f @ Max] `) + checkFrontier(c, f) +} + +func (s *spanFrontierSuite) TestSpanFrontierRandomly(c *check.C) { + var keyMin []byte + var keyMax []byte + spMinMax := regionspan.Span{Start: keyMin, End: keyMax} + f := NewFrontier(0, spMinMax) + + var spans []regionspan.Span + for len(spans) < 500000 { + span := regionspan.Span{ + Start: make([]byte, rand.Intn(32)+1), + End: make([]byte, rand.Intn(32)+1), + } + rand.Read(span.Start) + rand.Read(span.End) + cmp := bytes.Compare(span.Start, span.End) + if cmp == 0 { + continue + } else if cmp > 0 { + span.Start, span.End = span.End, span.Start + } + + spans = append(spans, span) + + ts := rand.Uint64() + + f.Forward(span, ts) + checkFrontier(c, f) + } +} + +func checkFrontier(c *check.C, f Frontier) { + sf := f.(*spanFrontier) + var tsInList, tsInHeap []uint64 + sf.spanList.Entries(func(n *skipListNode) bool { + tsInList = append(tsInList, n.Value().key) + return true + }) + sf.minTsHeap.Entries(func(n *fibonacciHeapNode) bool { + tsInHeap = append(tsInHeap, n.key) + return true + }) + c.Assert(len(tsInList), check.Equals, len(tsInHeap)) + sort.Slice(tsInList, func(i, j int) bool { return tsInList[i] < tsInList[j] }) + sort.Slice(tsInHeap, func(i, j int) bool { return tsInHeap[i] < tsInHeap[j] }) + c.Assert(tsInList, check.DeepEquals, tsInHeap) + c.Assert(f.Frontier(), check.Equals, tsInList[0]) } diff --git a/cdc/puller/frontier/heap.go b/cdc/puller/frontier/heap.go index 8a1f8ba9874..9c5cbebca87 100644 --- a/cdc/puller/frontier/heap.go +++ b/cdc/puller/frontier/heap.go @@ -13,45 +13,91 @@ package frontier -type minTsHeap struct { - root *node - min *node +type fibonacciHeapNode struct { + key uint64 + + left *fibonacciHeapNode + right *fibonacciHeapNode + children *fibonacciHeapNode + parent *fibonacciHeapNode + rank int + marked bool +} + +type fibonacciHeap struct { + root *fibonacciHeapNode + min *fibonacciHeapNode dirty bool } -func (h *minTsHeap) getMin() *node { +// GetMinKey returns the minimum key in the heap +func (h *fibonacciHeap) GetMinKey() uint64 { if h.dirty { h.consolidate() h.dirty = false } - return h.min + return h.min.key } -func (h *minTsHeap) insert(x *node) { +// Insert inserts a new node into the heap and returns this new node +func (h *fibonacciHeap) Insert(key uint64) *fibonacciHeapNode { + x := &fibonacciHeapNode{key: key} h.addToRoot(x) - if h.min == nil || h.min.ts > x.ts { + if h.min == nil || h.min.key > x.key { h.min = x } + return x } -func (h *minTsHeap) updateTs(x *node, ts uint64) { +// Remove removes a node from the heap +func (h *fibonacciHeap) Remove(x *fibonacciHeapNode) { + if x == h.min { + h.dirty = true + } + + child := x.children + isLast := child == nil + + for !isLast { + next := child.right + isLast = next == x.children + h.removeChildren(x, child) + h.addToRoot(child) + child = next + } + + parent := x.parent + if parent != nil { + h.removeChildren(parent, x) + if parent.marked { + h.cascadingCut(parent) + } else { + parent.marked = true + } + } else { + h.cutFromRoot(x) + } +} + +// UpdateKey updates the key of the node in the heap +func (h *fibonacciHeap) UpdateKey(x *fibonacciHeapNode, key uint64) { switch { - case x.ts == ts: + case x.key == key: return - case x.ts > ts: - h.decreaseTs(x, ts) - case x.ts < ts: - h.increaseTs(x, ts) + case x.key > key: + h.decreaseKey(x, key) + case x.key < key: + h.increaseKey(x, key) } } -func (h *minTsHeap) increaseTs(x *node, ts uint64) { +func (h *fibonacciHeap) increaseKey(x *fibonacciHeapNode, key uint64) { if x == h.min { h.dirty = true } - x.ts = ts + x.key = key child := x.children cascadingCut := false isLast := child == nil @@ -60,7 +106,7 @@ func (h *minTsHeap) increaseTs(x *node, ts uint64) { next := child.right isLast = next == x.children - if child.ts < x.ts { + if child.key < x.key { h.removeChildren(x, child) h.addToRoot(child) if x.marked { @@ -77,10 +123,10 @@ func (h *minTsHeap) increaseTs(x *node, ts uint64) { } } -func (h *minTsHeap) decreaseTs(x *node, ts uint64) { - x.ts = ts +func (h *fibonacciHeap) decreaseKey(x *fibonacciHeapNode, key uint64) { + x.key = key parent := x.parent - if parent != nil && parent.ts > x.ts { + if parent != nil && parent.key > x.key { h.removeChildren(parent, x) h.addToRoot(x) if parent.marked { @@ -89,12 +135,12 @@ func (h *minTsHeap) decreaseTs(x *node, ts uint64) { parent.marked = true } } - if x.parent == nil && h.min.ts > ts { + if x.parent == nil && h.min.key > key { h.min = x } } -func (h *minTsHeap) cascadingCut(x *node) { +func (h *fibonacciHeap) cascadingCut(x *fibonacciHeapNode) { x.marked = false for p := x.parent; p != nil; p = p.parent { h.removeChildren(p, x) @@ -113,8 +159,8 @@ func (h *minTsHeap) cascadingCut(x *node) { // we can know the size of consolidate table is around 46.09545510610244. const consolidateTableSize = 47 -func (h *minTsHeap) consolidate() { - var table [consolidateTableSize]*node +func (h *fibonacciHeap) consolidate() { + var table [consolidateTableSize]*fibonacciHeapNode x := h.root maxOrder := 0 h.min = h.root @@ -125,7 +171,7 @@ func (h *minTsHeap) consolidate() { z := table[y.rank] for z != nil { table[y.rank] = nil - if y.ts > z.ts { + if y.key > z.key { y, z = z, y } h.addChildren(y, z) @@ -146,30 +192,34 @@ func (h *minTsHeap) consolidate() { if n == nil { continue } - if h.min.ts > n.ts { + if h.min.key > n.key { h.min = n } h.addToRoot(n) } } -func (h *minTsHeap) addChildren(root, x *node) { +func (h *fibonacciHeap) addChildren(root, x *fibonacciHeapNode) { root.children = h.insertInto(root.children, x) root.rank++ x.parent = root } -func (h *minTsHeap) removeChildren(root, x *node) { +func (h *fibonacciHeap) removeChildren(root, x *fibonacciHeapNode) { root.children = h.cutFrom(root.children, x) root.rank-- x.parent = nil } -func (h *minTsHeap) addToRoot(x *node) { +func (h *fibonacciHeap) addToRoot(x *fibonacciHeapNode) { h.root = h.insertInto(h.root, x) } -func (h *minTsHeap) insertInto(head *node, x *node) *node { +func (h *fibonacciHeap) cutFromRoot(x *fibonacciHeapNode) { + h.root = h.cutFrom(h.root, x) +} + +func (h *fibonacciHeap) insertInto(head *fibonacciHeapNode, x *fibonacciHeapNode) *fibonacciHeapNode { if head == nil { x.left = x x.right = x @@ -182,7 +232,7 @@ func (h *minTsHeap) insertInto(head *node, x *node) *node { return x } -func (h *minTsHeap) cutFrom(head *node, x *node) *node { +func (h *fibonacciHeap) cutFrom(head *fibonacciHeapNode, x *fibonacciHeapNode) *fibonacciHeapNode { if x.right == x { x.right = nil x.left = nil diff --git a/cdc/puller/frontier/heap_test.go b/cdc/puller/frontier/heap_test.go index 5bb05fa4704..839ed2f8c6e 100644 --- a/cdc/puller/frontier/heap_test.go +++ b/cdc/puller/frontier/heap_test.go @@ -24,44 +24,89 @@ type tsHeapSuite struct{} var _ = check.Suite(&tsHeapSuite{}) -func (s *tsHeapSuite) insertIntoHeap(h *minTsHeap, ts uint64) *node { - n := &node{ts: ts} - h.insert(n) - return n -} - func (s *tsHeapSuite) TestInsert(c *check.C) { - var heap minTsHeap + var heap fibonacciHeap target := uint64(15000) for i := 0; i < 5000; i++ { - s.insertIntoHeap(&heap, uint64(10001)+target+1) + heap.Insert(uint64(10001) + target + 1) } - s.insertIntoHeap(&heap, target) + heap.Insert(target) - c.Assert(heap.getMin().ts, check.Equals, target) + c.Assert(heap.GetMinKey(), check.Equals, target) } func (s *tsHeapSuite) TestUpdateTs(c *check.C) { rand.Seed(0xdeadbeaf) - var heap minTsHeap - nodes := make([]*node, 50000) + var heap fibonacciHeap + nodes := make([]*fibonacciHeapNode, 50000) for i := range nodes { - nodes[i] = s.insertIntoHeap(&heap, 10000+uint64(rand.Intn(len(nodes)/2))) + nodes[i] = heap.Insert(10000 + uint64(rand.Intn(len(nodes)/2))) } for i := range nodes { - min := heap.getMin().ts + min := heap.GetMinKey() expectedMin := uint64(math.MaxUint64) for _, n := range nodes { - if expectedMin > n.ts { - expectedMin = n.ts + if expectedMin > n.key { + expectedMin = n.key } } c.Assert(min, check.Equals, expectedMin) if rand.Intn(2) == 0 { - heap.updateTs(nodes[i], nodes[i].ts+uint64(10000)) + heap.UpdateKey(nodes[i], nodes[i].key+uint64(10000)) } else { - heap.updateTs(nodes[i], nodes[i].ts-uint64(10000)) + heap.UpdateKey(nodes[i], nodes[i].key-uint64(10000)) + } + } +} + +func (s *tsHeapSuite) TestRemoveNode(c *check.C) { + rand.Seed(0xdeadbeaf) + var heap fibonacciHeap + nodes := make([]*fibonacciHeapNode, 50000) + for i := range nodes { + nodes[i] = heap.Insert(10000 + uint64(rand.Intn(len(nodes)/2))) + } + + for i := range nodes { + min := heap.GetMinKey() + expectedMin := uint64(math.MaxUint64) + for _, n := range nodes { + if isRemoved(n) { + continue + } + if expectedMin > n.key { + expectedMin = n.key + } + } + c.Assert(min, check.Equals, expectedMin) + heap.Remove(nodes[i]) + } + for _, n := range nodes { + if !isRemoved(n) { + c.Fatal("all of the node shoule be removed") + } + } +} + +func isRemoved(n *fibonacciHeapNode) bool { + return n.left == nil && n.right == nil && n.children == nil && n.parent == nil +} + +func (x *fibonacciHeap) Entries(fn func(n *fibonacciHeapNode) bool) { + heapNodeIterator(x.root, fn) +} + +func heapNodeIterator(n *fibonacciHeapNode, fn func(n *fibonacciHeapNode) bool) { + firstStep := true + + for next := n; next != nil && (next != n || firstStep); next = next.right { + firstStep = false + if !fn(next) { + return + } + if next.children != nil { + heapNodeIterator(next.children, fn) } } } diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 598dbd00e1c..0fc6be761e6 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -15,7 +15,10 @@ package frontier import ( "bytes" + "fmt" + "log" "math" + "strings" _ "unsafe" // required by go:linkname ) @@ -24,45 +27,58 @@ const ( maxHeight = 12 ) -type spanList struct { - head node - height int +type skipListNode struct { + key []byte + value *fibonacciHeapNode + + nexts []*skipListNode } -func (n *node) next() *node { - return n.nexts[0] +// Key is the key of the node +func (s *skipListNode) Key() []byte { + return s.key } -func (l *spanList) init() { - l.head.nexts = make([]*node, maxHeight) +// Value is the value of the node +func (s *skipListNode) Value() *fibonacciHeapNode { + return s.value } -func (l *spanList) insert(n *node) { - head := &l.head - lsHeight := l.height - var prev [maxHeight + 1]*node - var next [maxHeight + 1]*node - prev[lsHeight] = head +// Next returns the next node in the list +func (s *skipListNode) Next() *skipListNode { + return s.nexts[0] +} - for i := lsHeight - 1; i >= 0; i-- { - prev[i], next[i] = l.findSpliceForLevel(n.start, prev[i+1], i) - } - height := l.randomHeight() - n.nexts = make([]*node, height) - if height > l.height { - l.height = height +type seekResult []*skipListNode + +// Next points to the next seek result +func (s seekResult) Next() { + next := s.Node().Next() + for i := range next.nexts { + s[i] = next } +} - for i := 0; i < height; i++ { - n.nexts[i] = next[i] - if prev[i] == nil { - prev[i] = &l.head - } - prev[i].nexts[i] = n +// Node returns the node point by the seek result +func (s seekResult) Node() *skipListNode { + if len(s) == 0 { + return nil } + return s[0] } -func (l *spanList) randomHeight() int { +type skipList struct { + head skipListNode + height int +} + +func newSpanList() *skipList { + l := new(skipList) + l.head.nexts = make([]*skipListNode, maxHeight) + return l +} + +func (l *skipList) randomHeight() int { h := 1 for h < maxHeight && fastrand() < uint32(math.MaxUint32)/4 { h++ @@ -73,51 +89,131 @@ func (l *spanList) randomHeight() int { //go:linkname fastrand runtime.fastrand func fastrand() uint32 -func (l *spanList) findSpliceForLevel(key []byte, prev *node, level int) (*node, *node) { - for { - next := prev.nexts[level] - if next == nil { - return prev, nil - } - cmp := bytes.Compare(next.start, key) - if cmp >= 0 { - return prev, next +// Seek returns the seek result +// the seek result is a slice of nodes, +// Each element in the slice represents the nearest(left) node to the target value at each level of the skip list. +func (l *skipList) Seek(key []byte) seekResult { + head := &l.head + current := head + result := make(seekResult, maxHeight) + +LevelLoop: + for level := l.height - 1; level >= 0; level-- { + for { + next := current.nexts[level] + if next == nil { + result[level] = current + continue LevelLoop + } + cmp := bytes.Compare(key, next.key) + if cmp < 0 { + result[level] = current + continue LevelLoop + } + if cmp == 0 { + for ; level >= 0; level-- { + result[level] = next + } + return result + } + current = next } - prev = next } + + return result } -func (l *spanList) seek(start []byte) *node { - if l.head.next() == nil { - return nil +// InsertNextToNode insert the specified node after the seek result +func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) { + if seekR.Node() != nil && !nextTo(seekR.Node(), key) { + log.Fatal("the InsertNextToNode function can only append node to the seek result.") + } + height := l.randomHeight() + if l.height < height { + l.height = height + } + n := &skipListNode{ + key: key, + value: value, + nexts: make([]*skipListNode, height), } - head := &l.head - prev := head - level := l.height - 1 - - for { - next := prev.nexts[level] - if next != nil { - cmp := bytes.Compare(start, next.start) - if cmp > 0 { - prev = next - continue - } - if cmp == 0 { - return next - } + for level := 0; level < height; level++ { + + prev := seekR[level] + if prev == nil { + prev = &l.head } - if level > 0 { - level-- - continue + n.nexts[level] = prev.nexts[level] + prev.nexts[level] = n + } +} + +// Insert inserts the specified node +func (l *skipList) Insert(key []byte, value *fibonacciHeapNode) { + seekR := l.Seek(key) + l.InsertNextToNode(seekR, key, value) +} + +// Remove removes the specified node after the seek result +func (l *skipList) Remove(seekR seekResult, toRemove *skipListNode) { + seekCurrent := seekR.Node() + if seekCurrent == nil || seekCurrent.Next() != toRemove { + log.Fatal("the Remove function can only remove node right next to the seek result.") + } + for i := range toRemove.nexts { + seekR[i].nexts[i] = toRemove.nexts[i] + } +} + +// First returns the first node in the list +func (l *skipList) First() *skipListNode { + return l.head.Next() +} + +// Entries visit all the nodes in the list +func (l *skipList) Entries(fn func(*skipListNode) bool) { + for node := l.First(); node != nil; node = node.Next() { + if cont := fn(node); !cont { + return } - break } +} + +// String implements fmt.Stringer interface. +func (l *skipList) String() string { + var buf strings.Builder + l.Entries(func(node *skipListNode) bool { + buf.WriteString(fmt.Sprintf("[%s] ", node.key)) + return true + }) + return buf.String() +} - // The seek key is smaller than all keys in the list. - if prev == head { - return head.next() +// nextTo check if the key is right next to the node in list. +// the specified node is a node in the list. +// the specified key is a bytes to check position. +// return true if the key is right next to the node. +func nextTo(node *skipListNode, key []byte) bool { + cmp := bytes.Compare(node.key, key) + switch { + case cmp == 0: + return true + case cmp > 0: + return false + } + // cmp must be less than 0 here + next := node.nexts[0] + if next == nil { + // the node is the last node in the list + // we can insert the key after the last node. + return true + } + if bytes.Compare(next.key, key) <= 0 { + // the key of next node is less or equal to the specified key, + // this specified key should be inserted after the next node. + return false } - return prev + // the key is between with the node and the next node. + return true } diff --git a/cdc/puller/frontier/list_test.go b/cdc/puller/frontier/list_test.go index ac92ca561e8..35425c2dfac 100644 --- a/cdc/puller/frontier/list_test.go +++ b/cdc/puller/frontier/list_test.go @@ -14,6 +14,7 @@ package frontier import ( + "bytes" "math/rand" "github.com/pingcap/check" @@ -23,28 +24,71 @@ type spanListSuite struct{} var _ = check.Suite(&spanListSuite{}) -func (s *spanListSuite) insertIntoList(l *spanList, keys ...[]byte) { +func (s *spanListSuite) insertIntoList(l *skipList, keys ...[]byte) { for _, k := range keys { - l.insert(&node{ - start: k, - }) + l.Insert(k, nil) } } -func (s *spanListSuite) TestInsert(c *check.C) { +func (s *spanListSuite) TestInsertAndRemove(c *check.C) { + list := newSpanList() var keys [][]byte - for i := 0; i < 10000; i++ { - key := make([]byte, rand.Intn(19)+1) + for i := 0; i < 100000; i++ { + key := make([]byte, rand.Intn(128)+1) rand.Read(key) keys = append(keys, key) + list.Insert(key, nil) } - var list spanList - list.init() - s.insertIntoList(&list, keys...) - + // check all the keys are exist in list for _, k := range keys { - c.Assert(list.seek(k).start, check.BytesEquals, k) + a := list.Seek(k).Node().Key() + cmp := bytes.Compare(a, k) + c.Assert(cmp, check.Equals, 0) + } + checkList(c, list) + + for i := 0; i < 10000; i++ { + indexToRemove := rand.Intn(10000) + seekRes := list.Seek(keys[indexToRemove]) + if seekRes.Node().Next() == nil { + break + } + removedKey := seekRes.Node().Next().Key() + list.Remove(seekRes, seekRes.Node().Next()) + + // check the node is already removed + a := list.Seek(removedKey).Node().Key() + cmp := bytes.Compare(a, removedKey) + c.Assert(cmp, check.LessEqual, 0) + } + checkList(c, list) +} + +func checkList(c *check.C, list *skipList) { + // check the order of the keys in list + var lastKey []byte + var nodeNum int + for node := list.First(); node != nil; node = node.Next() { + if len(lastKey) != 0 { + cmp := bytes.Compare(lastKey, node.Key()) + c.Assert(cmp, check.LessEqual, 0) + } + lastKey = node.Key() + nodeNum++ + } + + // check all the pointers are valid + prevs := make([]*skipListNode, list.height) + for i := range prevs { + prevs[i] = list.head.nexts[i] + } + + for node := list.First(); node != nil; node = node.Next() { + for i := 0; i < len(node.nexts); i++ { + c.Assert(prevs[i], check.Equals, node) + prevs[i] = node.nexts[i] + } } } @@ -60,31 +104,56 @@ func (s *spanListSuite) TestSeek(c *check.C) { keyH := []byte("h5") keyZ := []byte("z5") - var list spanList - list.init() + list := newSpanList() - c.Assert(list.seek(keyA), check.IsNil) + c.Assert(list.Seek(keyA).Node(), check.IsNil) - s.insertIntoList(&list, keyA, keyB, keyC, keyD, keyE, keyF, keyG, keyH) + // insert keyA to keyH + s.insertIntoList(list, keyC, keyF, keyE, keyH, keyG, keyD, keyA, keyB) // Point to the first node, if seek key is smaller than the first key in list. - c.Assert(list.seek(key1).start, check.BytesEquals, keyA) + c.Assert(list.Seek(key1).Node().Key(), check.IsNil) // Point to the last node with key smaller than seek key. - c.Assert(list.seek(keyH).start, check.BytesEquals, keyH) + c.Assert(list.Seek(keyH).Node().key, check.BytesEquals, keyH) // Point to itself. - c.Assert(list.seek(keyG).start, check.BytesEquals, keyG) + c.Assert(list.Seek(keyG).Node().key, check.BytesEquals, keyG) // Ensure there is no problem to seek a larger key. - c.Assert(list.seek(keyZ).start, check.BytesEquals, keyH) - - c.Assert(list.seek([]byte("b0")).start, check.BytesEquals, keyA) - c.Assert(list.seek([]byte("c0")).start, check.BytesEquals, keyB) - c.Assert(list.seek([]byte("d0")).start, check.BytesEquals, keyC) - c.Assert(list.seek([]byte("e0")).start, check.BytesEquals, keyD) - c.Assert(list.seek([]byte("f0")).start, check.BytesEquals, keyE) - c.Assert(list.seek([]byte("g0")).start, check.BytesEquals, keyF) - c.Assert(list.seek([]byte("h0")).start, check.BytesEquals, keyG) - c.Assert(list.seek([]byte("i0")).start, check.BytesEquals, keyH) + c.Assert(list.Seek(keyZ).Node().key, check.BytesEquals, keyH) + + c.Assert(list.Seek([]byte("b0")).Node().key, check.BytesEquals, keyA) + c.Assert(list.Seek([]byte("c0")).Node().key, check.BytesEquals, keyB) + c.Assert(list.Seek([]byte("d0")).Node().key, check.BytesEquals, keyC) + c.Assert(list.Seek([]byte("e0")).Node().key, check.BytesEquals, keyD) + c.Assert(list.Seek([]byte("f0")).Node().key, check.BytesEquals, keyE) + c.Assert(list.Seek([]byte("g0")).Node().key, check.BytesEquals, keyF) + c.Assert(list.Seek([]byte("h0")).Node().key, check.BytesEquals, keyG) + c.Assert(list.Seek([]byte("i0")).Node().key, check.BytesEquals, keyH) + c.Assert(list.String(), check.Equals, "[a5] [b5] [c5] [d5] [e5] [f5] [g5] [h5] ") + checkList(c, list) + + // remove c5 + seekRes := list.Seek([]byte("c0")) + list.Remove(seekRes, seekRes.Node().Next()) + c.Assert(list.Seek([]byte("c0")).Node().key, check.BytesEquals, keyB) + c.Assert(list.Seek([]byte("d0")).Node().key, check.BytesEquals, keyB) + c.Assert(list.Seek([]byte("e0")).Node().key, check.BytesEquals, keyD) + c.Assert(list.String(), check.Equals, "[a5] [b5] [d5] [e5] [f5] [g5] [h5] ") + checkList(c, list) + + // remove d5 + list.Remove(seekRes, seekRes.Node().Next()) + c.Assert(list.Seek([]byte("d0")).Node().key, check.BytesEquals, keyB) + c.Assert(list.Seek([]byte("e0")).Node().key, check.BytesEquals, keyB) + c.Assert(list.Seek([]byte("f0")).Node().key, check.BytesEquals, keyE) + c.Assert(list.String(), check.Equals, "[a5] [b5] [e5] [f5] [g5] [h5] ") + checkList(c, list) + + // remove the first node + seekRes = list.Seek([]byte("10")) + list.Remove(seekRes, seekRes.Node().Next()) + c.Assert(list.String(), check.Equals, "[b5] [e5] [f5] [g5] [h5] ") + checkList(c, list) } diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 5e9b2909c32..84e13592373 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -80,7 +80,7 @@ func NewPuller( spans: spans, buffer: makeMemBuffer(limitter), outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), - tsTracker: frontier.NewFrontier(spans...), + tsTracker: frontier.NewFrontier(checkpointTs, spans...), needEncode: needEncode, resolvedTs: checkpointTs, }