Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Query profiles parquet by rowGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Jun 26, 2023
1 parent ebc3e04 commit 9ab5fba
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 37 deletions.
56 changes: 56 additions & 0 deletions pkg/iter/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,59 @@ func (it *BufferedIterator[T]) Next() bool {
func (it *BufferedIterator[T]) At() T {
return it.at
}

type LazyConcatIterator[T any] struct {
curr Iterator[T]
err error
nextIterator func() (Iterator[T], error)
}

// NewLazyConcatIterator returns an iterator that concats additional elements once it runs out.
func NewLazyConcatIterator[T any](nextIterator func() (Iterator[T], error)) Iterator[T] {
// get next iterator first
it, err := nextIterator()
return &LazyConcatIterator[T]{
curr: it,
err: err,
nextIterator: nextIterator,
}
}

func (it *LazyConcatIterator[T]) Next() bool {
if it.curr == nil {
return false
}

if it.curr.Next() {
return true
}

it.curr, it.err = it.nextIterator()

return it.Next()
}

func (it *LazyConcatIterator[T]) At() T {
if it.curr == nil {
var zero T
return zero
}
return it.curr.At()
}

func (it *LazyConcatIterator[T]) Err() error {
if it.err != nil {
return it.err
}
if it.curr == nil {
return nil
}
return it.curr.Err()
}

func (it *LazyConcatIterator[T]) Close() error {
if it.curr == nil {
return nil
}
return it.curr.Close()
}
94 changes: 57 additions & 37 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,46 +983,65 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
lbls = make(phlaremodel.Labels, 0, 6)
}
}
pIt := query.NewJoinIterator(
0,
[]query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
},
nil,
)

rowGroups := b.profiles.file.RowGroups()
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
buf := make([][]parquet.Value, 2)
defer pIt.Close()

currSeriesIndex := int64(-1)
var currentSeriesSlice []Profile
for pIt.Next() {
res := pIt.At()
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
seriesIndex := buf[0][0].Int64()
if seriesIndex != currSeriesIndex {
currSeriesIndex++
if len(currentSeriesSlice) > 0 {
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
buf := make([][]parquet.Value, 3)

return iter.NewLazyConcatIterator(func() (iter.Iterator[Profile], error) {

// all rowgroups have been read
if len(rowGroups) == 0 {
return nil, nil
}

sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - RowGroup")
defer sp.Finish()

pIt := query.NewJoinIterator(
0,
[]query.Iterator{
b.profiles.columnIter(ctx, rowGroups[0:1], "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, rowGroups[0:1], "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
b.profiles.columnIter(ctx, rowGroups[0:1], "StacktracePartition", nil, "StacktracePartition"),
},
nil,
)
defer pIt.Close()

// remove first row group
rowGroups = rowGroups[1:]

currSeriesIndex := int64(-1)
var currentSeriesSlice []Profile
for pIt.Next() {
res := pIt.At()
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
seriesIndex := buf[0][0].Int64()
if seriesIndex != currSeriesIndex {
currSeriesIndex++
if len(currentSeriesSlice) > 0 {
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
}
// can probably keep the ones form before
currentSeriesSlice = make([]Profile, 0, 100)
}
currentSeriesSlice = make([]Profile, 0, 100)

currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
labels: lblsPerRef[seriesIndex].lbs,
fp: lblsPerRef[seriesIndex].fp,
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
stacktracePartition: retrieveStacktracePartition(buf, 2),
RowNum: res.RowNumber[0],
})
}
if len(currentSeriesSlice) > 0 {
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
}

currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
labels: lblsPerRef[seriesIndex].lbs,
fp: lblsPerRef[seriesIndex].fp,
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
stacktracePartition: retrieveStacktracePartition(buf, 2),
RowNum: res.RowNumber[0],
})
}
if len(currentSeriesSlice) > 0 {
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
}
return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil

return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil
}), nil
}

func (b *singleBlockQuerier) Sort(in []Profile) []Profile {
Expand Down Expand Up @@ -1185,13 +1204,14 @@ func (r *parquetReader[M, P]) relPath() string {
return r.persister.Name() + block.ParquetSuffix
}

func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, predicate query.Predicate, alias string) query.Iterator {
func (r *parquetReader[M, P]) columnIter(ctx context.Context, rowGroups []parquet.RowGroup, columnName string, predicate query.Predicate, alias string) query.Iterator {

index, _ := query.GetColumnIndexByPath(r.file, columnName)
if index == -1 {
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
}
ctx = query.AddMetricsToContext(ctx, r.metrics.query)
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
return query.NewColumnIterator(ctx, rowGroups, index, columnName, 1000, predicate, alias)
}

func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] {
Expand Down

0 comments on commit 9ab5fba

Please sign in to comment.