-
Notifications
You must be signed in to change notification settings - Fork 682
/
merge_plan.go
428 lines (349 loc) · 12 KB
/
merge_plan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package mergeplan provides a segment merge planning approach that's
// inspired by Lucene's TieredMergePolicy.java and descriptions like
// http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html
package mergeplan
import (
"errors"
"fmt"
"math"
"sort"
"strings"
)
// A Segment represents the information that the planner needs to
// calculate segment merging.
type Segment interface {
// Unique id of the segment -- used for sorting.
Id() uint64
// Full segment size (the size before any logical deletions).
FullSize() int64
// Size of the live data of the segment; i.e., FullSize() minus
// any logical deletions.
LiveSize() int64
HasVector() bool
// Size of the persisted segment file.
FileSize() int64
}
// Plan() will functionally compute a merge plan. A segment will be
// assigned to at most a single MergeTask in the output MergePlan. A
// segment not assigned to any MergeTask means the segment should
// remain unmerged.
func Plan(segments []Segment, o *MergePlanOptions) (*MergePlan, error) {
return plan(segments, o)
}
// A MergePlan is the result of the Plan() API.
//
// The planner doesn’t know how or whether these tasks are executed --
// that’s up to a separate merge execution system, which might execute
// these tasks concurrently or not, and which might execute all the
// tasks or not.
type MergePlan struct {
Tasks []*MergeTask
}
// A MergeTask represents several segments that should be merged
// together into a single segment.
type MergeTask struct {
Segments []Segment
}
// The MergePlanOptions is designed to be reusable between planning calls.
type MergePlanOptions struct {
// Max # segments per logarithmic tier, or max width of any
// logarithmic “step”. Smaller values mean more merging but fewer
// segments. Should be >= SegmentsPerMergeTask, else you'll have
// too much merging.
MaxSegmentsPerTier int
// Max size of any segment produced after merging. Actual
// merging, however, may produce segment sizes different than the
// planner’s predicted sizes.
MaxSegmentSize int64
// Max size (in bytes) of the persisted segment file that contains the
// vectors. This is used to prevent merging of segments that
// contain vectors that are too large.
MaxSegmentFileSize int64
// The growth factor for each tier in a staircase of idealized
// segments computed by CalcBudget().
TierGrowth float64
// The number of segments in any resulting MergeTask. e.g.,
// len(result.Tasks[ * ].Segments) == SegmentsPerMergeTask.
SegmentsPerMergeTask int
// Small segments are rounded up to this size, i.e., treated as
// equal (floor) size for consideration. This is to prevent lots
// of tiny segments from resulting in a long tail in the index.
FloorSegmentSize int64
// Controls how aggressively merges that reclaim more deletions
// are favored. Higher values will more aggressively target
// merges that reclaim deletions, but be careful not to go so high
// that way too much merging takes place; a value of 3.0 is
// probably nearly too high. A value of 0.0 means deletions don't
// impact merge selection.
ReclaimDeletesWeight float64
// Optional, defaults to mergeplan.CalcBudget().
CalcBudget func(totalSize int64, firstTierSize int64,
o *MergePlanOptions) (budgetNumSegments int)
// Optional, defaults to mergeplan.ScoreSegments().
ScoreSegments func(segments []Segment, o *MergePlanOptions) float64
// Optional.
Logger func(string)
}
// Returns the higher of the input or FloorSegmentSize.
func (o *MergePlanOptions) RaiseToFloorSegmentSize(s int64) int64 {
if s > o.FloorSegmentSize {
return s
}
return o.FloorSegmentSize
}
// MaxSegmentSizeLimit represents the maximum size of a segment,
// this limit comes with hit-1 optimisation/max encoding limit uint31.
const MaxSegmentSizeLimit = 1<<31 - 1
// ErrMaxSegmentSizeTooLarge is returned when the size of the segment
// exceeds the MaxSegmentSizeLimit
var ErrMaxSegmentSizeTooLarge = errors.New("MaxSegmentSize exceeds the size limit")
// DefaultMergePlanOptions suggests the default options.
var DefaultMergePlanOptions = MergePlanOptions{
MaxSegmentsPerTier: 10,
MaxSegmentSize: 5000000,
MaxSegmentFileSize: 4000000000, // 4GB
TierGrowth: 10.0,
SegmentsPerMergeTask: 10,
FloorSegmentSize: 2000,
ReclaimDeletesWeight: 2.0,
}
// SingleSegmentMergePlanOptions helps in creating a
// single segment index.
var SingleSegmentMergePlanOptions = MergePlanOptions{
MaxSegmentsPerTier: 1,
MaxSegmentSize: 1 << 30,
TierGrowth: 1.0,
SegmentsPerMergeTask: 10,
FloorSegmentSize: 1 << 30,
ReclaimDeletesWeight: 2.0,
}
// -------------------------------------------
func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) {
if len(segmentsIn) <= 1 {
return nil, nil
}
if o == nil {
o = &DefaultMergePlanOptions
}
segments := append([]Segment(nil), segmentsIn...) // Copy.
sort.Sort(byLiveSizeDescending(segments))
var minLiveSize int64 = math.MaxInt64
var eligibles []Segment
var eligiblesLiveSize int64
for _, segment := range segments {
if minLiveSize > segment.LiveSize() {
minLiveSize = segment.LiveSize()
}
isEligible := segment.LiveSize() < o.MaxSegmentSize/2
// An eligible segment (based on #documents) may be too large
// and thus need a stricter check based on the file size.
// This is particularly important for segments that contain
// vectors.
if isEligible && segment.HasVector() && o.MaxSegmentFileSize > 0 {
isEligible = segment.FileSize() < o.MaxSegmentFileSize/2
}
// Only small-enough segments are eligible.
if isEligible {
eligibles = append(eligibles, segment)
eligiblesLiveSize += segment.LiveSize()
}
}
minLiveSize = o.RaiseToFloorSegmentSize(minLiveSize)
calcBudget := o.CalcBudget
if calcBudget == nil {
calcBudget = CalcBudget
}
budgetNumSegments := calcBudget(eligiblesLiveSize, minLiveSize, o)
scoreSegments := o.ScoreSegments
if scoreSegments == nil {
scoreSegments = ScoreSegments
}
rv := &MergePlan{}
var empties []Segment
for _, eligible := range eligibles {
if eligible.LiveSize() <= 0 {
empties = append(empties, eligible)
}
}
if len(empties) > 0 {
rv.Tasks = append(rv.Tasks, &MergeTask{Segments: empties})
eligibles = removeSegments(eligibles, empties)
}
// While we’re over budget, keep looping, which might produce
// another MergeTask.
for len(eligibles) > 0 && (len(eligibles)+len(rv.Tasks)) > budgetNumSegments {
// Track a current best roster as we examine and score
// potential rosters of merges.
var bestRoster []Segment
var bestRosterScore float64 // Lower score is better.
for startIdx := 0; startIdx < len(eligibles); startIdx++ {
var roster []Segment
var rosterLiveSize int64
var rosterFileSize int64 // useful for segments with vectors
for idx := startIdx; idx < len(eligibles) && len(roster) < o.SegmentsPerMergeTask; idx++ {
eligible := eligibles[idx]
if rosterLiveSize+eligible.LiveSize() >= o.MaxSegmentSize {
continue
}
if eligible.HasVector() {
efs := eligible.FileSize()
if rosterFileSize+efs >= o.MaxSegmentFileSize {
continue
}
rosterFileSize += efs
}
roster = append(roster, eligible)
rosterLiveSize += eligible.LiveSize()
}
if len(roster) > 0 {
rosterScore := scoreSegments(roster, o)
if len(bestRoster) == 0 || rosterScore < bestRosterScore {
bestRoster = roster
bestRosterScore = rosterScore
}
}
}
if len(bestRoster) == 0 {
return rv, nil
}
rv.Tasks = append(rv.Tasks, &MergeTask{Segments: bestRoster})
eligibles = removeSegments(eligibles, bestRoster)
}
return rv, nil
}
// Compute the number of segments that would be needed to cover the
// totalSize, by climbing up a logarithmically growing staircase of
// segment tiers.
func CalcBudget(totalSize int64, firstTierSize int64, o *MergePlanOptions) (
budgetNumSegments int) {
tierSize := firstTierSize
if tierSize < 1 {
tierSize = 1
}
maxSegmentsPerTier := o.MaxSegmentsPerTier
if maxSegmentsPerTier < 1 {
maxSegmentsPerTier = 1
}
tierGrowth := o.TierGrowth
if tierGrowth < 1.0 {
tierGrowth = 1.0
}
for totalSize > 0 {
segmentsInTier := float64(totalSize) / float64(tierSize)
if segmentsInTier < float64(maxSegmentsPerTier) {
budgetNumSegments += int(math.Ceil(segmentsInTier))
break
}
budgetNumSegments += maxSegmentsPerTier
totalSize -= int64(maxSegmentsPerTier) * tierSize
tierSize = int64(float64(tierSize) * tierGrowth)
}
return budgetNumSegments
}
// Of note, removeSegments() keeps the ordering of the results stable.
func removeSegments(segments []Segment, toRemove []Segment) []Segment {
rv := make([]Segment, 0, len(segments)-len(toRemove))
OUTER:
for _, segment := range segments {
for _, r := range toRemove {
if segment == r {
continue OUTER
}
}
rv = append(rv, segment)
}
return rv
}
// Smaller result score is better.
func ScoreSegments(segments []Segment, o *MergePlanOptions) float64 {
var totBeforeSize int64
var totAfterSize int64
var totAfterSizeFloored int64
for _, segment := range segments {
totBeforeSize += segment.FullSize()
totAfterSize += segment.LiveSize()
totAfterSizeFloored += o.RaiseToFloorSegmentSize(segment.LiveSize())
}
if totBeforeSize <= 0 || totAfterSize <= 0 || totAfterSizeFloored <= 0 {
return 0
}
// Roughly guess the "balance" of the segments -- whether the
// segments are about the same size.
balance :=
float64(o.RaiseToFloorSegmentSize(segments[0].LiveSize())) /
float64(totAfterSizeFloored)
// Gently favor smaller merges over bigger ones. We don't want to
// make the exponent too large else we end up with poor merges of
// small segments in order to avoid the large merges.
score := balance * math.Pow(float64(totAfterSize), 0.05)
// Strongly favor merges that reclaim deletes.
nonDelRatio := float64(totAfterSize) / float64(totBeforeSize)
score *= math.Pow(nonDelRatio, o.ReclaimDeletesWeight)
return score
}
// ------------------------------------------
// ToBarChart returns an ASCII rendering of the segments and the plan.
// The barMax is the max width of the bars in the bar chart.
func ToBarChart(prefix string, barMax int, segments []Segment, plan *MergePlan) string {
rv := make([]string, 0, len(segments))
var maxFullSize int64
for _, segment := range segments {
if maxFullSize < segment.FullSize() {
maxFullSize = segment.FullSize()
}
}
if maxFullSize < 0 {
maxFullSize = 1
}
for _, segment := range segments {
barFull := int(segment.FullSize())
barLive := int(segment.LiveSize())
if maxFullSize > int64(barMax) {
barFull = int(float64(barMax) * float64(barFull) / float64(maxFullSize))
barLive = int(float64(barMax) * float64(barLive) / float64(maxFullSize))
}
barKind := " "
barChar := "."
if plan != nil {
TASK_LOOP:
for taski, task := range plan.Tasks {
for _, taskSegment := range task.Segments {
if taskSegment == segment {
barKind = "*"
barChar = fmt.Sprintf("%d", taski)
break TASK_LOOP
}
}
}
}
bar :=
strings.Repeat(barChar, barLive)[0:barLive] +
strings.Repeat("x", barFull-barLive)[0:barFull-barLive]
rv = append(rv, fmt.Sprintf("%s %5d: %5d /%5d - %s %s", prefix,
segment.Id(),
segment.LiveSize(),
segment.FullSize(),
barKind, bar))
}
return strings.Join(rv, "\n")
}
// ValidateMergePlannerOptions validates the merge planner options
func ValidateMergePlannerOptions(options *MergePlanOptions) error {
if options.MaxSegmentSize > MaxSegmentSizeLimit {
return ErrMaxSegmentSizeTooLarge
}
return nil
}