Skip to content

Commit 483d191

Browse files
authored
Merge pull request #24 from wide-vsix/dev
LogOutputHook Feature
2 parents 8ea074c + 8a624fa commit 483d191

12 files changed

+400
-21
lines changed

config.yaml

+16-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,22 @@ outputs:
2525
# nfdump -r /tmp/netflow/nfcapd.202207101030 -o extended
2626
- log:
2727
file: /tmp/flow.log
28-
28+
# hooks:
29+
# - name: hostname addition
30+
# command: /usr/bin/hook_command_example_hostname.sh
31+
# - name: shell to resolve hostname
32+
# shell: |
33+
# #!/bin/sh
34+
# echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}'
35+
# - name: shell to resolve ifname from ifindex
36+
# shell: |
37+
# #!/bin/sh
38+
# IN=$(cat)
39+
# I_IDX=$(echo $IN | jq .ingressIfindex -r)
40+
# E_IDX=$(echo $IN | jq .egressIfindex -r )
41+
# I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
42+
# E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
43+
# echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}'
2944
templates:
3045
- id: 1001
3146
template:
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
maxIpfixMessageLen: 100
2+
timerFinishedDrainSeconds: 1
3+
timerForceDrainSeconds: 30
4+
timerTemplateFlushSeconds: 60
5+
outputs:
6+
- log:
7+
file: /tmp/flowlog.json
8+
hooks:
9+
- name: hostname addition
10+
command: ./misc/hook_command_example_dummy.sh
11+
- name: shell to resolve hostname
12+
shell: |
13+
#!/bin/sh
14+
echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}'
15+
- name: shell to resolve ifname from ifindex
16+
shell: |
17+
#!/bin/sh
18+
IN=$(cat)
19+
I_IDX=$(echo $IN | jq .ingressIfindex -r)
20+
E_IDX=$(echo $IN | jq .egressIfindex -r )
21+
I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
22+
E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
23+
echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}'
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{
2+
"level": "info",
3+
"ts": 1667266821.7434478,
4+
"caller": "flowctl/ipfix.go:430",
5+
"msg": "flowlog",
6+
"bytes": 594,
7+
"finished": 1,
8+
"foo": "bar",
9+
"pkts": 6,
10+
"proto": 6,
11+
"start": 12168400776634952,
12+
"action": 0,
13+
"ingressIfindex": 3,
14+
"dport": 32816,
15+
"egressIfindex": 2,
16+
"hostname": "slankdev",
17+
"src": "10.2.0.2",
18+
"ingressIfname": "eth2",
19+
"egressIfname": "eth1",
20+
"dst": "10.1.0.2",
21+
"sport": 8080,
22+
"end": 12168400778056844
23+
}
24+
{
25+
"level": "info",
26+
"ts": 1667266821.9629853,
27+
"caller": "flowctl/ipfix.go:430",
28+
"msg": "flowlog",
29+
"dport": 8080,
30+
"dst": "10.2.0.2",
31+
"proto": 6,
32+
"src": "10.1.0.2",
33+
"finished": 1,
34+
"sport": 32816,
35+
"start": 12168400776608754,
36+
"ingressIfname": "eth1",
37+
"egressIfindex": 3,
38+
"foo": "bar",
39+
"egressIfname": "eth2",
40+
"pkts": 6,
41+
"action": 0,
42+
"bytes": 481,
43+
"end": 12168400778037424,
44+
"hostname": "slankdev",
45+
"ingressIfindex": 2
46+
}

misc/hook_command_example_dummy.sh

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/bin/sh
2+
# IN:
3+
# {
4+
# "src": "10.1.0.1",
5+
# "dst": "10.2.0.1",
6+
# "pkts": 10,
7+
# "bytes": 1000
8+
# }
9+
#
10+
# OUT:
11+
# {
12+
# "src": "10.1.0.1",
13+
# "dst": "10.2.0.1",
14+
# "pkts": 10,
15+
# "bytes": 1000,
16+
# "foo": "bar"
17+
# }
18+
echo `cat` | jq '. + {foo: "bar"}'

misc/hook_command_example_hostname.sh

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/bin/sh
2+
# IN:
3+
# {
4+
# "src": "10.1.0.1",
5+
# "dst": "10.2.0.1",
6+
# "pkts": 10,
7+
# "bytes": 1000
8+
# }
9+
#
10+
# OUT:
11+
# {
12+
# "src": "10.1.0.1",
13+
# "dst": "10.2.0.1",
14+
# "pkts": 10,
15+
# "bytes": 1000,
16+
# "hostname": "machine1"
17+
# }
18+
echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}'

misc/hook_command_example_ifname.sh

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/sh
2+
# IN:
3+
# {
4+
# "ingressIfindex": 1,
5+
# "egressIfindex": 2,
6+
# "pkts": 10,
7+
# "bytes": 1000
8+
# }
9+
#
10+
# OUT:
11+
# {
12+
# "ingressIfindex": 1,
13+
# "egressIfindex": 2,
14+
# "ingressIfname": 1,
15+
# "egressIfname": 2,
16+
# "pkts": 10,
17+
# "bytes": 1000
18+
# }
19+
IN=$(cat)
20+
I_IDX=$(echo $IN | jq .ingressIfindex -r)
21+
E_IDX=$(echo $IN | jq .egressIfindex -r )
22+
I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
23+
E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
24+
echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}'

pkg/ebpfmap/types.go

+29-15
Original file line numberDiff line numberDiff line change
@@ -308,20 +308,34 @@ func ToIpfixFlowFile(ebflows []Flow) (*ipfix.FlowFile, error) {
308308
return flowFile, nil
309309
}
310310

311-
func (f Flow) ToZap() []interface{} {
312-
return []interface{}{
313-
"src", util.ConvertUint32ToIP(f.Key.Saddr).String(),
314-
"dst", util.ConvertUint32ToIP(f.Key.Daddr).String(),
315-
"proto", f.Key.Proto,
316-
"sport", f.Key.Sport,
317-
"dport", f.Key.Dport,
318-
"ingressIfindex", f.Key.IngressIfindex,
319-
"egressIfindex", f.Key.EgressIfindex,
320-
"pkts", f.Val.FlowPkts,
321-
"bytes", f.Val.FlowBytes,
322-
"action", f.Key.Mark,
323-
"start", f.Val.FlowStartMilliSecond,
324-
"end", f.Val.FlowEndMilliSecond,
325-
"finished", f.Val.Finished,
311+
func (f Flow) ToZap(o ipfix.OutputLog) ([]interface{}, error) {
312+
m := map[string]interface{}{
313+
"src": util.ConvertUint32ToIP(f.Key.Saddr).String(),
314+
"dst": util.ConvertUint32ToIP(f.Key.Daddr).String(),
315+
"proto": f.Key.Proto,
316+
"sport": f.Key.Sport,
317+
"dport": f.Key.Dport,
318+
"ingressIfindex": f.Key.IngressIfindex,
319+
"egressIfindex": f.Key.EgressIfindex,
320+
"pkts": f.Val.FlowPkts,
321+
"bytes": f.Val.FlowBytes,
322+
"action": f.Key.Mark,
323+
"start": f.Val.FlowStartMilliSecond,
324+
"end": f.Val.FlowEndMilliSecond,
325+
"finished": f.Val.Finished,
326326
}
327+
328+
for _, h := range o.Hooks {
329+
var err error
330+
m, err = h.Execute(m)
331+
if err != nil {
332+
return nil, err
333+
}
334+
}
335+
336+
ret := []interface{}{}
337+
for key, val := range m {
338+
ret = append(ret, key, val)
339+
}
340+
return ret, nil
327341
}

pkg/flowctl/ipfix.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func fnIpfixDump(cmd *cobra.Command, args []string) error {
194194
return fmt.Errorf("invalid config")
195195
}
196196
if o.Log != nil {
197-
if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil {
197+
if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil {
198198
return err
199199
}
200200
}
@@ -345,7 +345,7 @@ func flushCachesFinished(config ipfix.Config) error {
345345
return fmt.Errorf("invalid config")
346346
}
347347
if o.Log != nil {
348-
if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil {
348+
if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil {
349349
return err
350350
}
351351
}
@@ -385,7 +385,7 @@ func flushCaches(config ipfix.Config) error {
385385
return fmt.Errorf("invalid config")
386386
}
387387
if o.Log != nil {
388-
if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil {
388+
if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil {
389389
return err
390390
}
391391
}
@@ -415,7 +415,7 @@ func flushCaches(config ipfix.Config) error {
415415
return nil
416416
}
417417

418-
func FlowOutputLog(flows []ebpfmap.Flow, out string) error {
418+
func FlowOutputLog(flows []ebpfmap.Flow, out string, o ipfix.OutputLog) error {
419419
cfg := zap.NewProductionConfig()
420420
cfg.OutputPaths = []string{
421421
out,
@@ -427,7 +427,11 @@ func FlowOutputLog(flows []ebpfmap.Flow, out string) error {
427427
log := zapr.NewLogger(zapLog)
428428

429429
for _, flow := range flows {
430-
log.Info("flowlog", flow.ToZap()...)
430+
args, err := flow.ToZap(o)
431+
if err != nil {
432+
return err
433+
}
434+
log.Info("flowlog", args...)
431435
}
432436
return nil
433437
}

pkg/hook/command.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2022 Hiroki Shirokura.
3+
Copyright 2022 LINE Corporation.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package hook
19+
20+
import (
21+
"bytes"
22+
"encoding/json"
23+
"fmt"
24+
"os/exec"
25+
)
26+
27+
type Command string
28+
29+
var _ Hook = (*Command)(nil)
30+
31+
func (c Command) Execute(in map[string]interface{}) (map[string]interface{}, error) {
32+
// Prepare input/output
33+
stdoutbuf := bytes.Buffer{}
34+
stderrbuf := bytes.Buffer{}
35+
stdinbytes, err := json.Marshal(in)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
// Execute child process
41+
cmd := exec.Command(string(c))
42+
cmd.Stdout = &stdoutbuf
43+
cmd.Stderr = &stderrbuf
44+
cmd.Stdin = bytes.NewBuffer(stdinbytes)
45+
if err := cmd.Run(); err != nil {
46+
return nil, fmt.Errorf("child process is failed: err=%v stderr=%s",
47+
err, stderrbuf.String())
48+
}
49+
50+
// Convert back to map data from json-bytes
51+
out := map[string]interface{}{}
52+
if err := json.Unmarshal(stdoutbuf.Bytes(), &out); err != nil {
53+
return nil, err
54+
}
55+
return out, nil
56+
}

pkg/hook/interface.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2022 Hiroki Shirokura.
3+
Copyright 2022 LINE Corporation.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package hook
19+
20+
type Hook interface {
21+
Execute(in map[string]interface{}) (map[string]interface{}, error)
22+
}

0 commit comments

Comments
 (0)