Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query describe endpoint #5126

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions compiler/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package compiler

import (
"context"

"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/describe"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/lakeparse"
)

func Describe(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) {
seq, sset, err := Parse(query)
if err != nil {
return nil, err
}
entry, err := semantic.AnalyzeAddSource(ctx, seq, src, head)
if err != nil {
if list, ok := err.(parser.ErrorList); ok {
list.SetSourceSet(sset)
}
return nil, err
}
return describe.Analyze(ctx, src, entry)
}
156 changes: 156 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package describe

import (
"context"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
)

type Info struct {
Sources []Source `json:"sources"`
Channels []Channel `json:"channels"`
}

type Source interface {
Source()
}

type (
LakeMeta struct {
Kind string `json:"kind"`
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}

type Channel struct {
AggregationKeys field.List `json:"aggregation_keys"`
Sort *order.SortKey `json:"sort"`
}

func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) {
var info Info
var err error
if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, source).SortKeys(seq)
if err != nil {
return nil, err
}
aggKeys := describeAggs(seq, []field.List{nil})
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
var s *order.SortKey
if !sortKeys[i].IsNil() {
s = &sortKeys[i]
}
info.Channels = append(info.Channels, Channel{
Sort: s,
AggregationKeys: aggKeys[i],
})
}
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
nwt marked this conversation as resolved.
Show resolved Hide resolved
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

func describeAggs(seq dag.Seq, parents []field.List) []field.List {
for _, op := range seq {
parents = describeOpAggs(op, parents)
}
return parents
}

func describeOpAggs(op dag.Op, parents []field.List) []field.List {
switch op := op.(type) {
case *dag.Fork:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Scatter:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Summarize:
// The field list for aggregation with no keys is an empty slice and
// not nil.
keys := field.List{}
for _, k := range op.Keys {
keys = append(keys, k.LHS.(*dag.This).Path)
}
return []field.List{keys}
}
// If more than one parent reset to nil aggregation.
if len(parents) > 1 {
return []field.List{nil}
}
return parents
}
6 changes: 5 additions & 1 deletion compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
})
}

func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) {
return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil})
}

// propagateSortKey analyzes a Seq and attempts to push the scan order of the data source
// into the first downstream aggregation. (We could continue the analysis past that
// point but don't bother yet because we do not yet support any optimization
Expand Down Expand Up @@ -330,7 +334,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or
// We'll live this as unknown for now even though the groupby
// and not try to optimize downstream of the first groupby
// unless there is an excplicit sort encountered.
return nil, nil
return []order.SortKey{order.Nil}, nil
case *dag.Fork:
var keys []order.SortKey
for _, seq := range op.Paths {
Expand Down
1 change: 1 addition & 0 deletions service/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (c *Core) addAPIServerRoutes() {
c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE")
c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET")
c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST")
c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST")
c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET")
}

Expand Down
16 changes: 16 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/brimdata/zed/api"
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/lake"
lakeapi "github.com/brimdata/zed/lake/api"
"github.com/brimdata/zed/lake/commits"
"github.com/brimdata/zed/lake/journal"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/exec"
"github.com/brimdata/zed/runtime/sam/op"
Expand Down Expand Up @@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) {
w.Respond(http.StatusOK, ast)
}

func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) {
var req api.QueryRequest
if !r.Unmarshal(w, &req) {
return
}
src := data.NewSource(storage.NewRemoteEngine(), c.root)
info, err := compiler.Describe(r.Context(), req.Query, src, &req.Head)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
}
w.Respond(http.StatusOK, info)
}

func handleBranchGet(c *Core, w *ResponseWriter, r *Request) {
branchName, ok := r.StringFromPath(w, "branch")
if !ok {
Expand Down
Loading
Loading