-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathstored.go
223 lines (184 loc) · 5.82 KB
/
stored.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// A stored query encapsulates a VQL query which is yet to
// execute. Readers can request the query's channel and can read from
// it to drain its results.
// Stored queries implement the LET VQL directive. The LET keyword
// defines a stored query which is evaluated on demand. It looks just
// like a subselect but it is an efficient mechanism of passing the
// result of one query into another. Consider the following query:
// LET files = select * from glob(globs="/**") where Size < 100
// SELECT FullPath from files
// The LET keyword creates a stored query. This query does not
// immediately run until it is used as the subject of the second
// query. Most importantly, the second query does not need to wait for
// the stored query to completely produce its output. The first query
// can immediately feed rows to the second query for additional
// filtering. This leads to zero memory overhead as the rows do not
// need to be queued in memory.
package vfilter
import (
"context"
"github.com/Velocidex/ordereddict"
"www.velocidex.com/golang/vfilter/types"
"www.velocidex.com/golang/vfilter/utils"
)
// A stored expression is stored in a LET clause either with or
// without parameters. e.g.:
// LET Y = SELECT * FROM plugin()
// LET Y(X) = SELECT * FROM plugin(foo=X)
type _StoredQuery struct {
query *_Select
name string
parameters []string
}
func NewStoredQuery(query *_Select, name string) *_StoredQuery {
return &_StoredQuery{
query: query,
name: name,
}
}
func (self *_StoredQuery) Eval(ctx context.Context, scope types.Scope) <-chan Row {
output_chan := make(chan Row)
go func() {
defer close(output_chan)
// Evaluate the query in the caller's scope.
new_scope := scope.Copy()
defer new_scope.Close()
for row := range self.query.Eval(ctx, new_scope) {
select {
case <-ctx.Done():
return
case output_chan <- row:
}
}
}()
return output_chan
}
// Stored queries can also behave like plugins. This just means we
// evaluate it with a subscope built on top of the args.
func (self *_StoredQuery) Info(scope types.Scope, type_map *TypeMap) *PluginInfo {
return &PluginInfo{}
}
func (self *_StoredQuery) Call(ctx context.Context,
scope types.Scope, args *ordereddict.Dict) <-chan Row {
// When running a stored query, we need to use a brand new scope
// with its own aggregator context to make sure that aggregate
// functions inside the stored query start fresh.
sub_scope := scope.Copy()
sub_scope.SetAggregatorCtx(nil)
defer sub_scope.Close()
self.checkCallingArgs(sub_scope, args)
vars := ordereddict.NewDict()
for _, k := range args.Keys() {
v, _ := args.Get(k)
switch t := v.(type) {
case types.LazyExpr:
v = t.Reduce(ctx)
case types.Materializer:
v = t.Materialize(ctx, sub_scope)
case types.StoredQuery:
v = types.Materialize(ctx, sub_scope, t)
}
vars.Set(k, v)
}
sub_scope.AppendVars(vars)
return self.Eval(ctx, sub_scope)
}
func (self *_StoredQuery) checkCallingArgs(scope types.Scope, args *ordereddict.Dict) {
// No parameters - do not warn
if self.parameters == nil {
return
}
// Check that all parameters are properly called.
seen_map := make(map[string]bool)
for _, k := range args.Keys() {
if !utils.InString(&self.parameters, k) {
scope.Log("ERROR:Extra unrecognized arg %v when calling %v",
k, self.name)
}
seen_map[k] = true
}
// Some args are missing.
if len(seen_map) < len(self.parameters) {
for _, k := range self.parameters {
_, pres := seen_map[k]
if !pres {
scope.Log("ERROR:Missing arg %v when calling %v",
k, self.name)
}
}
}
}
// A stored expression is stored in a LET clause either with or
// without parameters. e.g.:
// LET Y = count()
// LET Y(X) = format(format="Hello %v", args=[X])
// Unlike the LazyExpr the value of StoredExpression is not cached -
// this means each time it is evaluated, the expression is fully
// expanded. NOTE: The StoredExpression is evaluated at the point of
// reference not at the point of definition - therefore when
// evaluated, we must provide the scope at that point.
type StoredExpression struct {
Expr *_AndExpression
name string
parameters []string
}
func (self *StoredExpression) Reduce(
ctx context.Context, scope types.Scope) types.Any {
return self.Expr.Reduce(ctx, scope)
}
// Act as a function
func (self *StoredExpression) Call(ctx context.Context,
scope types.Scope, args *ordereddict.Dict) types.Any {
self.checkCallingArgs(scope, args)
sub_scope := scope.Copy()
defer sub_scope.Close()
vars := ordereddict.NewDict()
for _, k := range args.Keys() {
v, _ := args.Get(k)
switch t := v.(type) {
case types.LazyExpr:
v = t.Reduce(ctx)
case types.StoredQuery:
v = types.Materialize(ctx, scope, t)
}
vars.Set(k, v)
}
sub_scope.AppendVars(vars)
return self.Reduce(ctx, sub_scope)
}
func (self *StoredExpression) checkCallingArgs(scope types.Scope, args *ordereddict.Dict) {
// No parameters - do not warn
if self.parameters == nil {
return
}
// Check that all parameters are properly called.
seen_map := make(map[string]bool)
for _, k := range args.Keys() {
if !utils.InString(&self.parameters, k) {
scope.Log("ERROR:Extra unrecognized arg %v when calling %v",
k, self.name)
}
seen_map[k] = true
}
// Some args are missing.
if len(seen_map) < len(self.parameters) {
for _, k := range self.parameters {
_, pres := seen_map[k]
if !pres {
scope.Log("ERROR:Missing arg %v when calling %v",
k, self.name)
}
}
}
}
// A wrapper around a stored query which captures its call site's
// parameters in a new scope. When the wrapper is evaluated, the call
// site's scope will be used.
type StoredQueryCallSite struct {
query StoredQuery
scope Scope
}
func (self *StoredQueryCallSite) Eval(ctx context.Context, scope Scope) <-chan Row {
// Use our embedded scope instead.
return self.query.Eval(ctx, self.scope)
}