Skip to content

Commit

Permalink
puller: fix a memory leak bug in the frontier (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Jul 3, 2020
1 parent 0057782 commit 39bb362
Show file tree
Hide file tree
Showing 8 changed files with 605 additions and 281 deletions.
151 changes: 62 additions & 89 deletions cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package frontier
import (
"bytes"
"fmt"
"math"
"strings"

"github.com/pingcap/ticdc/pkg/regionspan"
Expand All @@ -28,66 +29,41 @@ type Frontier interface {
String() string
}

type node struct {
start []byte
end []byte
ts uint64

// used by list
nexts []*node

// used by heap
left *node
right *node
children *node
parent *node
rank int
marked bool
}

func (s *node) String() string {
return fmt.Sprintf("[{%s %s} @ %d]", s.start, s.end, s.ts)
}

// spanFrontier tracks the minimum timestamp of a set of spans.
type spanFrontier struct {
list spanList
minHeap minTsHeap
spanList skipList
minTsHeap fibonacciHeap
}

// NewFrontier creates Froniter from the given spans.
func NewFrontier(spans ...regionspan.Span) Frontier {
func NewFrontier(checkpointTs uint64, spans ...regionspan.Span) Frontier {
// spanFrontier don't support use Nil as the maximum key of End range
// So we use set it as util.UpperBoundKey, the means the real use case *should not* have a
// End key bigger than util.UpperBoundKey
for i, span := range spans {
spans[i] = span.Hack()
}

s := &spanFrontier{}
s.list.init()

s := &spanFrontier{
spanList: *newSpanList(),
}
firstSpan := true
for _, span := range spans {
e := &node{
start: span.Start,
end: span.End,
ts: 0,
if firstSpan {
s.spanList.Insert(span.Start, s.minTsHeap.Insert(checkpointTs))
s.spanList.Insert(span.End, s.minTsHeap.Insert(math.MaxUint64))
firstSpan = false
continue
}

s.list.insert(e)
s.minHeap.insert(e)
s.insert(span, checkpointTs)
}

return s
}

// Frontier return the minimum tiemstamp.
func (s *spanFrontier) Frontier() uint64 {
min := s.minHeap.getMin()
if min == nil {
return 0
}
return min.ts
return s.minTsHeap.GetMinKey()
}

// Forward advances the timestamp for a span.
Expand All @@ -97,70 +73,67 @@ func (s *spanFrontier) Forward(span regionspan.Span, ts uint64) {
}

func (s *spanFrontier) insert(span regionspan.Span, ts uint64) {
n := s.list.seek(span.Start)

if n == nil || bytes.Compare(span.End, n.start) <= 0 {
// No overlapped spans.
return
}

if bytes.Compare(n.end, span.Start) <= 0 {
// Pointed span has no overlap, step to next.
n = n.next()
}

if n != nil && bytes.Compare(span.Start, n.start) > 0 {
e := &node{
start: n.start,
end: span.Start,
ts: n.ts,
seekRes := s.spanList.Seek(span.Start)

// if there is no change in the region span
// We just need to update the ts corresponding to the span in list
next := seekRes.Node().Next()
if next != nil {
cmpStart := bytes.Compare(seekRes.Node().Key(), span.Start)
cmpEnd := bytes.Compare(next.Key(), span.End)
if cmpStart == 0 && cmpEnd == 0 {
s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts)
return
}
n.start = span.Start
s.list.insert(e)
s.minHeap.insert(e)
}

for n != nil {
oldTs := n.ts
if ts != n.ts {
s.minHeap.updateTs(n, ts)
}

next := n.next()
if next != nil && bytes.Compare(next.start, span.End) < 0 {
n = next
// regions are merged or split, overwrite span into list
node := seekRes.Node()
lastNodeTs := uint64(math.MaxUint64)
shouldInsertStartNode := true
if node.Value() != nil {
lastNodeTs = node.Value().key
}
for ; node != nil; node = node.Next() {
cmpStart := bytes.Compare(node.Key(), span.Start)
if cmpStart < 0 {
continue
}

// no more overlapped spans.
if cmp := bytes.Compare(span.End, n.end); cmp < 0 {
e := &node{
start: span.End,
end: n.end,
ts: oldTs,
}
s.list.insert(e)
n.end = span.End
s.minHeap.insert(e)
if bytes.Compare(node.Key(), span.End) > 0 {
break
}
lastNodeTs = node.Value().key
if cmpStart == 0 {
s.minTsHeap.UpdateKey(node.Value(), ts)
shouldInsertStartNode = false
} else {
s.spanList.Remove(seekRes, node)
s.minTsHeap.Remove(node.Value())
}
return
}
if shouldInsertStartNode {
s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts))
seekRes.Next()
}
s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs))
}

// Entries visit all traced spans.
func (s *spanFrontier) Entries(fn func(start, end []byte, ts uint64)) {
for n := s.list.head.next(); n != nil; n = n.next() {
fn(n.start, n.end, n.ts)
}
func (s *spanFrontier) Entries(fn func(key []byte, ts uint64)) {
s.spanList.Entries(func(n *skipListNode) bool {
fn(n.Key(), n.Value().key)
return true
})
}

func (s *spanFrontier) String() string {
var buf strings.Builder
for n := s.list.head.next(); n != nil; n = n.next() {
if buf.Len() != 0 {
buf.WriteString(` `)
s.Entries(func(key []byte, ts uint64) {
if ts == math.MaxUint64 {
buf.WriteString(fmt.Sprintf("[%s @ Max] ", key))
} else {
buf.WriteString(fmt.Sprintf("[%s @ %d] ", key, ts))
}
buf.WriteString(n.String())
}
})
return buf.String()
}
4 changes: 2 additions & 2 deletions cdc/puller/frontier/frontier_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func BenchmarkSpanFrontier(b *testing.B) {
spans = append(spans, span)
}

f := NewFrontier(spans...)
f := NewFrontier(0, spans...)

b.ResetTimer()

Expand Down Expand Up @@ -91,7 +91,7 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) {
})
}

f := NewFrontier(spans...)
f := NewFrontier(0, spans...)

b.ResetTimer()

Expand Down
Loading

0 comments on commit 39bb362

Please sign in to comment.