Skip to content

Commit

Permalink
feat(protobuf): add and populate the sampler_hostname field
Browse files Browse the repository at this point in the history
  • Loading branch information
tgragnato committed Nov 20, 2024
1 parent 9a863d9 commit e3ef7a1
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 23 deletions.
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// decoders
"github.com/tgragnato/goflow/decoders/netflow"
"github.com/tgragnato/goflow/geoip"
"github.com/tgragnato/goflow/sampler"

// various formatters
"github.com/tgragnato/goflow/format"
Expand Down Expand Up @@ -78,6 +79,7 @@ func LoadMapping(f io.Reader) (*protoproducer.ProducerConfig, error) {
func main() {
flag.Parse()
geoip.Init(*GeoipASN, *GeoipCC)
sampler.Init()

formatter, err := format.FindFormat(*Format)
if err != nil {
Expand Down
57 changes: 34 additions & 23 deletions pb/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pb/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ message FlowMessage {
string lms_target_index = 1002;
string src_asn = 1003;
string dst_asn = 1004;
string sampler_hostname = 1005;
}
5 changes: 5 additions & 0 deletions producer/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/tgragnato/goflow/decoders/sflow"
"github.com/tgragnato/goflow/geoip"
"github.com/tgragnato/goflow/producer"
"github.com/tgragnato/goflow/sampler"
)

const LMS_TARGET_INDEX = "goflow.local"
Expand Down Expand Up @@ -60,6 +61,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl
fmsg.SrcAs, fmsg.SrcAsn = geoip.GetASNByByteSlice(fmsg.SrcAddr)
fmsg.DstCountry = geoip.GetCountryByByteSlice(fmsg.DstAddr)
fmsg.DstAs, fmsg.DstAsn = geoip.GetASNByByteSlice(fmsg.DstAddr)
fmsg.SamplerHostname = sampler.GetHostnameByByteSlice(fmsg.SamplerAddress)
if len(fmsg.AsPath) == 0 {
fmsg.AsPath = []uint32{fmsg.SrcAs, 0, fmsg.DstAs}
}
Expand All @@ -76,6 +78,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl
fmsg.SrcAs, fmsg.SrcAsn = geoip.GetASNByByteSlice(fmsg.SrcAddr)
fmsg.DstCountry = geoip.GetCountryByByteSlice(fmsg.DstAddr)
fmsg.DstAs, fmsg.DstAsn = geoip.GetASNByByteSlice(fmsg.DstAddr)
fmsg.SamplerHostname = sampler.GetHostnameByByteSlice(fmsg.SamplerAddress)
if len(fmsg.AsPath) == 0 {
fmsg.AsPath = []uint32{fmsg.SrcAs, 0, fmsg.DstAs}
}
Expand All @@ -92,6 +95,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl
fmsg.SrcAs, fmsg.SrcAsn = geoip.GetASNByByteSlice(fmsg.SrcAddr)
fmsg.DstCountry = geoip.GetCountryByByteSlice(fmsg.DstAddr)
fmsg.DstAs, fmsg.DstAsn = geoip.GetASNByByteSlice(fmsg.DstAddr)
fmsg.SamplerHostname = sampler.GetHostnameByByteSlice(fmsg.SamplerAddress)
if len(fmsg.AsPath) == 0 {
fmsg.AsPath = []uint32{fmsg.SrcAs, 0, fmsg.DstAs}
}
Expand All @@ -108,6 +112,7 @@ func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (fl
fmsg.SrcAs, fmsg.SrcAsn = geoip.GetASNByByteSlice(fmsg.SrcAddr)
fmsg.DstCountry = geoip.GetCountryByByteSlice(fmsg.DstAddr)
fmsg.DstAs, fmsg.DstAsn = geoip.GetASNByByteSlice(fmsg.DstAddr)
fmsg.SamplerHostname = sampler.GetHostnameByByteSlice(fmsg.SamplerAddress)
if len(fmsg.AsPath) == 0 {
fmsg.AsPath = []uint32{fmsg.SrcAs, 0, fmsg.DstAs}
}
Expand Down
62 changes: 62 additions & 0 deletions sampler/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package sampler

import (
"fmt"
"net"
"sync"
"time"
)

type cache struct {
items map[string]string
sync.RWMutex
}

func newcache() *cache {
cache := &cache{
map[string]string{},
sync.RWMutex{},
}
go func() {
for range time.Tick(time.Hour) {
go cache.update()
}
}()
return cache
}

func (c *cache) get(ip net.IP) (string, bool) {
c.RLock()
defer c.RUnlock()
v, ok := c.items[ip.String()]
return v, ok
}

func (c *cache) set(ip net.IP) {
c.Lock()
defer c.Unlock()
c.items[ip.String()] = getHostnameFromIp(ip)
fmt.Printf("New sampler discovered: (%s)=> %s\n", ip.String(), c.items[ip.String()])
}

func getHostnameFromIp(ip net.IP) string {
hostname, err := net.LookupAddr(ip.String())
if err != nil || len(hostname) <= 0 {
return ""
}
if hostname[0][len(hostname[0])-1] == '.' {
return hostname[0][:len(hostname[0])-1]
}
return hostname[0]
}

func (c *cache) update() {
c.Lock()
defer c.Unlock()
for k := range c.items {
newHostname := getHostnameFromIp(net.ParseIP(k))
if newHostname != "" && newHostname != c.items[k] {
c.items[k] = newHostname
}
}
}
20 changes: 20 additions & 0 deletions sampler/shim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sampler

import "fmt"

var reverse *cache

func Init() {
reverse = newcache()
fmt.Println("Reverse DNS cache initialized")
}

func GetHostnameByByteSlice(ip []byte) string {
if reverse, ok := reverse.get(ip); ok {
return reverse
}

// Latency will be low at the cost of missing items during the discovery phase
go reverse.set(ip)
return ""
}

0 comments on commit e3ef7a1

Please sign in to comment.