Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller: fix a memory leak bug in the frontier #704

Merged
merged 5 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 62 additions & 89 deletions cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package frontier
import (
"bytes"
"fmt"
"math"
"strings"

"github.com/pingcap/ticdc/pkg/regionspan"
Expand All @@ -28,66 +29,41 @@ type Frontier interface {
String() string
}

type node struct {
start []byte
end []byte
ts uint64

// used by list
nexts []*node

// used by heap
left *node
right *node
children *node
parent *node
rank int
marked bool
}

func (s *node) String() string {
return fmt.Sprintf("[{%s %s} @ %d]", s.start, s.end, s.ts)
}

// spanFrontier tracks the minimum timestamp of a set of spans.
type spanFrontier struct {
list spanList
minHeap minTsHeap
spanList skipList
minTsHeap fibonacciHeap
}

// NewFrontier creates Froniter from the given spans.
func NewFrontier(spans ...regionspan.Span) Frontier {
func NewFrontier(checkpointTs uint64, spans ...regionspan.Span) Frontier {
// spanFrontier don't support use Nil as the maximum key of End range
// So we use set it as util.UpperBoundKey, the means the real use case *should not* have a
// End key bigger than util.UpperBoundKey
for i, span := range spans {
spans[i] = span.Hack()
}

s := &spanFrontier{}
s.list.init()

s := &spanFrontier{
spanList: *newSpanList(),
}
firstSpan := true
for _, span := range spans {
e := &node{
start: span.Start,
end: span.End,
ts: 0,
if firstSpan {
s.spanList.Insert(span.Start, s.minTsHeap.Insert(checkpointTs))
s.spanList.Insert(span.End, s.minTsHeap.Insert(math.MaxUint64))
firstSpan = false
continue
}

s.list.insert(e)
s.minHeap.insert(e)
s.insert(span, checkpointTs)
}

return s
}

// Frontier return the minimum tiemstamp.
func (s *spanFrontier) Frontier() uint64 {
min := s.minHeap.getMin()
if min == nil {
return 0
}
return min.ts
return s.minTsHeap.GetMinKey()
}

// Forward advances the timestamp for a span.
Expand All @@ -97,70 +73,67 @@ func (s *spanFrontier) Forward(span regionspan.Span, ts uint64) {
}

func (s *spanFrontier) insert(span regionspan.Span, ts uint64) {
n := s.list.seek(span.Start)

if n == nil || bytes.Compare(span.End, n.start) <= 0 {
// No overlapped spans.
return
}

if bytes.Compare(n.end, span.Start) <= 0 {
// Pointed span has no overlap, step to next.
n = n.next()
}

if n != nil && bytes.Compare(span.Start, n.start) > 0 {
e := &node{
start: n.start,
end: span.Start,
ts: n.ts,
seekRes := s.spanList.Seek(span.Start)

// if there is no change in the region span
// We just need to update the ts corresponding to the span in list
next := seekRes.Node().Next()
if next != nil {
cmpStart := bytes.Compare(seekRes.Node().Key(), span.Start)
cmpEnd := bytes.Compare(next.Key(), span.End)
if cmpStart == 0 && cmpEnd == 0 {
s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts)
return
}
n.start = span.Start
s.list.insert(e)
s.minHeap.insert(e)
}

for n != nil {
oldTs := n.ts
if ts != n.ts {
s.minHeap.updateTs(n, ts)
}

next := n.next()
if next != nil && bytes.Compare(next.start, span.End) < 0 {
n = next
// regions are merged or split, overwrite span into list
node := seekRes.Node()
lastNodeTs := uint64(math.MaxUint64)
shouldInsertStartNode := true
if node.Value() != nil {
lastNodeTs = node.Value().key
}
for ; node != nil; node = node.Next() {
cmpStart := bytes.Compare(node.Key(), span.Start)
if cmpStart < 0 {
continue
}

// no more overlapped spans.
if cmp := bytes.Compare(span.End, n.end); cmp < 0 {
e := &node{
start: span.End,
end: n.end,
ts: oldTs,
}
s.list.insert(e)
n.end = span.End
s.minHeap.insert(e)
if bytes.Compare(node.Key(), span.End) > 0 {
break
}
lastNodeTs = node.Value().key
if cmpStart == 0 {
s.minTsHeap.UpdateKey(node.Value(), ts)
shouldInsertStartNode = false
} else {
s.spanList.Remove(seekRes, node)
s.minTsHeap.Remove(node.Value())
}
return
}
if shouldInsertStartNode {
s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts))
seekRes.Next()
}
s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs))
}

// Entries visit all traced spans.
func (s *spanFrontier) Entries(fn func(start, end []byte, ts uint64)) {
for n := s.list.head.next(); n != nil; n = n.next() {
fn(n.start, n.end, n.ts)
}
func (s *spanFrontier) Entries(fn func(key []byte, ts uint64)) {
s.spanList.Entries(func(n *skipListNode) bool {
fn(n.Key(), n.Value().key)
return true
})
}

func (s *spanFrontier) String() string {
var buf strings.Builder
for n := s.list.head.next(); n != nil; n = n.next() {
if buf.Len() != 0 {
buf.WriteString(` `)
s.Entries(func(key []byte, ts uint64) {
if ts == math.MaxUint64 {
buf.WriteString(fmt.Sprintf("[%s @ Max] ", key))
} else {
buf.WriteString(fmt.Sprintf("[%s @ %d] ", key, ts))
}
buf.WriteString(n.String())
}
})
return buf.String()
}
4 changes: 2 additions & 2 deletions cdc/puller/frontier/frontier_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func BenchmarkSpanFrontier(b *testing.B) {
spans = append(spans, span)
}

f := NewFrontier(spans...)
f := NewFrontier(0, spans...)
amyangfei marked this conversation as resolved.
Show resolved Hide resolved

b.ResetTimer()

Expand Down Expand Up @@ -91,7 +91,7 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) {
})
}

f := NewFrontier(spans...)
f := NewFrontier(0, spans...)

b.ResetTimer()

Expand Down
Loading