Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vam: Add network_of() func #5306

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions runtime/vam/expr/function/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ func (f *Fields) Call(args ...vector.Any) vector.Any {
inOffs, outOffs = appendPaths(buildPath(rtyp, nil), s, inOffs, outOffs)
}
inner := vector.NewArray(f.innerTyp, inOffs, s, nil)
out := vector.NewArray(f.outerTyp, outOffs, inner, nil)
if len(errs) > 0 {
return vector.Combine(out, errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
}
return out
b := vector.NewCombiner(vector.NewArray(f.outerTyp, outOffs, inner, nil))
b.Add(errs, vector.NewStringError(f.zctx, "missing", uint32(len(errs))))
return b.Result()
Comment on lines -50 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector.Combine feels like a better API here than vector.NewCombiner since the latter allocates a vector.Error even when len(errs)==0. Why not stick with it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't work for multiple error values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how about using an API that works for multiple error values when you need that and an API that doesn't do unnecessary allocations when you don't?

default:
return vector.NewStringError(f.zctx, "missing", val.Len())
}
Expand Down
3 changes: 3 additions & 0 deletions runtime/vam/expr/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func New(zctx *zed.Context, name string, narg int) (expr.Function, field.Path, e
f = &Levenshtein{zctx}
case "lower":
f = &ToLower{zctx}
case "network_of":
argmax = 2
f = &NetworkOf{zctx}
case "quiet":
f = &Quiet{zctx}
case "replace":
Expand Down
103 changes: 103 additions & 0 deletions runtime/vam/expr/function/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package function

import (
"net/netip"

"github.com/brimdata/zed"
"github.com/brimdata/zed/vector"
)

// https://github.com/brimdata/zed/blob/main/docs/language/functions.md#network_of
type NetworkOf struct {
zctx *zed.Context
}

func (n *NetworkOf) Call(args ...vector.Any) vector.Any {
args = underAll(args)
ipvec := args[0]
if ipvec.Type().ID() != zed.IDIP {
return vector.NewWrappedError(n.zctx, "network_of: not an IP", ipvec)
}
var nets []netip.Prefix
size := ipvec.Len()
if len(args) == 1 {
var errs []uint32
for i := uint32(0); i < size; i++ {
var bits int
ip, _ := vector.IPValue(ipvec, i)
switch {
case !ip.Is4():
errs = append(errs, i)
continue
case ip.As4()[0] < 0x80:
bits = 8
case ip.As4()[0] < 0xc0:
bits = 16
default:
bits = 24
}
nets = append(nets, netip.PrefixFrom(ip, bits).Masked())
}
b := vector.NewCombiner(vector.NewNet(nets, nil))
b.WrappedError(n.zctx, errs, "network_of: not an IPv4 address", args[0])
return b.Result()
} else {
// two args
maskvec := args[1]
switch id := maskvec.Type().ID(); {
case id == zed.IDIP:
var errsLen, errsCont []uint32
for i := uint32(0); i < size; i++ {
ip, _ := vector.IPValue(ipvec, i)
mask, _ := vector.IPValue(maskvec, i)
if mask.BitLen() != ip.BitLen() {
errsLen = append(errsLen, i)
continue
}
bits := zed.LeadingOnes(mask.AsSlice())
if netip.PrefixFrom(mask, bits).Masked().Addr() != mask {
errsCont = append(errsCont, i)
continue
}
nets = append(nets, netip.PrefixFrom(ip, bits).Masked())
}
b := vector.NewCombiner(vector.NewNet(nets, nil))
m := addressAndMask(n.zctx, args[0], args[1])
b.WrappedError(n.zctx, errsLen, "network_of: address and mask have different lengths", m)
b.WrappedError(n.zctx, errsCont, "network_of: mask is non-contiguous", args[1])
return b.Result()
case zed.IsInteger(id):
var errs []uint32
for i := uint32(0); i < size; i++ {
var bits int
if zed.IsSigned(id) {
b, _ := vector.IntValue(maskvec, i)
bits = int(b)
} else {
b, _ := vector.UintValue(maskvec, i)
bits = int(b)
}
ip, _ := vector.IPValue(ipvec, i)
if bits > 128 || bits > 32 && ip.Is4() {
errs = append(errs, i)
continue
}
nets = append(nets, netip.PrefixFrom(ip, bits).Masked())
}
b := vector.NewCombiner(vector.NewNet(nets, nil))
m := addressAndMask(n.zctx, args[0], args[1])
b.WrappedError(n.zctx, errs, "network_of: CIDR bit count out of range", m)
return b.Result()
default:
return vector.NewWrappedError(n.zctx, "network_of: bad arg for CIDR mask", args[1])
}
}
}

func addressAndMask(zctx *zed.Context, address, mask vector.Any) vector.Any {
typ := zctx.MustLookupTypeRecord([]zed.Field{
{Name: "address", Type: address.Type()},
{Name: "mask", Type: mask.Type()},
})
return vector.NewRecord(typ, []vector.Any{address, mask}, address.Len(), nil)
}
8 changes: 3 additions & 5 deletions runtime/vam/expr/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ func indexArrayOrSet(zctx *zed.Context, offsets []uint32, vals, index vector.Any
}
errs = append(errs, uint32(i))
}
out := vector.Deunion(vector.NewView(viewIndexes, vals))
if len(errs) > 0 {
return vector.Combine(out, errs, vector.NewMissing(zctx, uint32(len(errs))))
}
return out
b := vector.NewCombiner(vector.Deunion(vector.NewView(viewIndexes, vals)))
b.Add(errs, vector.NewMissing(zctx, uint32(len(errs))))
return b.Result()
}

func indexRecord(zctx *zed.Context, record *vector.Record, index vector.Any) vector.Any {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: yield network_of(this, 24), network_of(this, 255.255.255.128), network_of(this, 255.128.255.255), network_of(this, ff::)

vector: true

input: |
10.1.2.129

Expand Down
70 changes: 52 additions & 18 deletions vector/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,61 @@ type Puller interface {

type Builder func(*zcode.Builder) bool

func Combine(vec Any, index []uint32, add Any) Any {
var vecs []Any
tags := make([]uint32, int(vec.Len())+len(index))
if d, ok := vec.(*Dynamic); ok {
vecs = d.Values
varTags := d.Tags
n := uint32(len(vecs))
for i := uint32(0); i < uint32(len(tags)); i++ {
type Combiner struct {
base Any
vecs []Any
indexes [][]uint32
}

func NewCombiner(base Any) *Combiner {
return &Combiner{base: base}
}

func (c *Combiner) WrappedError(zctx *zed.Context, index []uint32, msg string, inner Any) {
c.Add(index, NewWrappedError(zctx, msg, NewView(index, inner)))
}

func (c *Combiner) Add(index []uint32, vec Any) {
if len(index) == 0 {
return
}
c.vecs = append(c.vecs, vec)
c.indexes = append(c.indexes, index)
}

func (c *Combiner) Result() Any {
if len(c.vecs) == 0 {
return c.base
}
var baseVecs []Any
var baseTags []uint32
if dynamic, ok := c.base.(*Dynamic); ok {
baseVecs = dynamic.Values
baseTags = dynamic.Tags
} else {
baseVecs = []Any{c.base}
baseTags = make([]uint32, c.base.Len())
}
size := c.base.Len()
for _, vec := range c.vecs {
size += vec.Len()
}
tags := make([]uint32, int(size))
n := uint32(len(baseVecs))
for i := uint32(0); i < size; i++ {
var matched bool
for j, index := range c.indexes {
if len(index) > 0 && i == index[0] {
tags[i] = n
index = index[1:]
} else {
tags[i] = varTags[0]
varTags = varTags[1:]
tags[i] = n + uint32(j)
c.indexes[j] = index[1:]
matched = true
break
}
}
} else {
vecs = []Any{vec}
for _, k := range index {
tags[k] = 1
if !matched {
tags[i] = baseTags[0]
baseTags = baseTags[1:]
}
}
return NewDynamic(tags, append(vecs, add))
return NewDynamic(tags, append(baseVecs, c.vecs...))
}
17 changes: 17 additions & 0 deletions vector/uint.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,20 @@ func (u *Uint) Serialize(b *zcode.Builder, slot uint32) {
func (u *Uint) Promote(typ zed.Type) Promotable {
return &Uint{typ, u.Values, u.Nulls}
}

func UintValue(vec Any, slot uint32) (uint64, bool) {
switch vec := Under(vec).(type) {
case *Uint:
return vec.Value(slot), vec.Nulls.Value(slot)
case *Const:
return vec.Value().Ptr().Uint(), vec.Nulls.Value(slot)
case *Dict:
return UintValue(vec.Any, uint32(vec.Index[slot]))
case *Dynamic:
tag := vec.Tags[slot]
return UintValue(vec.Values[tag], vec.TagMap.Forward[slot])
case *View:
return UintValue(vec.Any, vec.Index[slot])
}
panic(vec)
}