Skip to content

Commit

Permalink
Extend optimization to overlapping inserts
Browse files Browse the repository at this point in the history
Prior to this commit, we would copy MCAP bytes directly to storage if we
were merging one leaf into an empty destination leaf. Now we will do the
same even if there's a nonempty destination, as long as there are no
overlaps in the timestamp/sequence pairs of the destination and the new
data.

Further, when there is duplication/overlap, we would previously read
MCAP messages out of the destination leaf to compare against. This
commit now takes advantage of the message keys in the header instead of
reading those messages, making the additional overhead of deduplicating
roughly invariant with respect to message size.
  • Loading branch information
wkalt committed Jun 19, 2024
1 parent 7d39c92 commit 4e80100
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 83 deletions.
34 changes: 23 additions & 11 deletions mcap/filter_merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"

"github.com/foxglove/mcap/go/mcap"
"github.com/wkalt/dp3/nodestore"
"github.com/wkalt/dp3/util"
)

Expand Down Expand Up @@ -52,11 +53,14 @@ type filterMergeIterator struct {
nextChannelID uint16

lastMark marker

filter []nodestore.MessageKey
filterIdx int
}

// NewFilterMergeIterator returns a new filter merge iterator.
func NewFilterMergeIterator(
filter MessageIterator,
filter []nodestore.MessageKey, // ordered by timestamp
iterators ...MessageIterator,
) (MessageIterator, error) {
pq := util.NewPriorityQueue(func(a, b record) bool {
Expand All @@ -70,12 +74,8 @@ func NewFilterMergeIterator(
})
heap.Init(pq)

// NB: mask may be nil, in which case zeroth index is unused.
targets := []MessageIterator{filter}
targets = append(targets, iterators...)

// push one element from each iterator onto queue
for i, it := range targets {
for i, it := range iterators {
if it == nil {
continue
}
Expand All @@ -92,13 +92,14 @@ func NewFilterMergeIterator(

return &filterMergeIterator{
pq: pq,
iterators: targets,
iterators: iterators,
schemaHashes: make(map[uint64]*mcap.Schema),
channelHashes: make(map[uint64]*mcap.Channel),
channels: make(map[*mcap.Channel]*mcap.Channel),
schemas: make(map[*mcap.Schema]*mcap.Schema),
nextSchemaID: 1,
nextChannelID: 0,
filter: filter,
}, nil
}

Expand All @@ -107,7 +108,7 @@ func NewFilterMergeIterator(
func FilterMerge(
w io.Writer,
msgCallback func(*mcap.Schema, *mcap.Channel, *mcap.Message) error,
mask MessageIterator,
mask []nodestore.MessageKey,
iterators ...MessageIterator,
) error {
iterator, err := NewFilterMergeIterator(mask, iterators...)
Expand Down Expand Up @@ -137,11 +138,22 @@ func (mi *filterMergeIterator) Next([]byte) (*mcap.Schema, *mcap.Channel, *mcap.
heap.Push(mi.pq, rec)
}

// Skip any messages from the mask, but record the time/sequence
if rec.idx == 0 {
// Bring the filter forward to the current message if it is behind.
for mi.filterIdx < len(mi.filter) &&
(mi.filter[mi.filterIdx].Timestamp < rec.message.LogTime ||
(mi.filter[mi.filterIdx].Timestamp == rec.message.LogTime &&
mi.filter[mi.filterIdx].Sequence < rec.message.Sequence)) {
mi.filterIdx++
}

// If the timestamp and sequence match the current index in the filter,
// skip completely. Multiple hits are possible so do not bump the filter
// index.
if mi.filterIdx < len(mi.filter) && rec.message.LogTime == mi.filter[mi.filterIdx].Timestamp &&
rec.message.Sequence == mi.filter[mi.filterIdx].Sequence {
mi.lastMark.timestamp = rec.message.LogTime
mi.lastMark.sequence = rec.message.Sequence
mi.lastMark.valid = true
mi.lastMark.sequence = rec.message.Sequence
continue
}

Expand Down
10 changes: 5 additions & 5 deletions mcap/filter_merge_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/wkalt/dp3/mcap"
"github.com/wkalt/dp3/nodestore"
)

func TestMergeFilterIterator(t *testing.T) {
Expand Down Expand Up @@ -81,15 +82,14 @@ func TestMergeFilterIterator(t *testing.T) {

for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
var filter mcap.MessageIterator
if len(c.filter) > 0 {
filter = mcap.NewMockIterator("topic", c.filter)
}

var inputs []mcap.MessageIterator
for _, input := range c.inputs {
inputs = append(inputs, mcap.NewMockIterator("topic", input))
}
filter := []nodestore.MessageKey{}
for _, f := range c.filter {
filter = append(filter, nodestore.MessageKey{Timestamp: uint64(f[0]), Sequence: uint32(f[1])})
}
iter, err := mcap.NewFilterMergeIterator(filter, inputs...)
require.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion nodestore/leaf_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (n *LeafNode) Size() uint64 {
return uint64(len(n.data) + 1)
}

// AncestorDeleted returns true if the ancestor node has been deleted.
// AncestorDeleted returns true if the ancestor node has been partially or fully
// deleted.
func (n *LeafNode) AncestorDeleted() bool {
return n.ancestorDeleteEnd > 0
}
Expand Down
Loading

0 comments on commit 4e80100

Please sign in to comment.