diff --git a/deployments/flp-daemonset-cap.yml b/deployments/flp-daemonset-cap.yml index 8f58306d7..23a4c4864 100644 --- a/deployments/flp-daemonset-cap.yml +++ b/deployments/flp-daemonset-cap.yml @@ -59,6 +59,7 @@ spec: containers: - name: flowlogs-pipeline image: quay.io/netobserv/flowlogs-pipeline:main + imagePullPolicy: Always ports: - containerPort: 9999 hostPort: 9999 @@ -115,7 +116,6 @@ data: - "SrcK8S_OwnerName" - "DstK8S_Namespace" - "DstK8S_OwnerName" - - "FlowDirection" url: http://loki.netobserv.svc:3100 timestampLabel: TimeFlowEndMs timestampScale: 1ms diff --git a/deployments/flp-daemonset.yml b/deployments/flp-daemonset.yml index 10fec6cbe..5c9d331c8 100644 --- a/deployments/flp-daemonset.yml +++ b/deployments/flp-daemonset.yml @@ -112,7 +112,6 @@ data: - "SrcK8S_OwnerName" - "DstK8S_Namespace" - "DstK8S_OwnerName" - - "FlowDirection" url: http://loki.netobserv.svc:3100 timestampLabel: TimeFlowEndMs timestampScale: 1ms diff --git a/deployments/flp-service.yml b/deployments/flp-service.yml index 3a76a992a..d4e2d6185 100644 --- a/deployments/flp-service.yml +++ b/deployments/flp-service.yml @@ -22,7 +22,7 @@ spec: containers: - name: netobserv-ebpf-agent image: quay.io/netobserv/netobserv-ebpf-agent:main - # imagePullPolicy: Always + imagePullPolicy: Always securityContext: privileged: true runAsUser: 0 @@ -69,6 +69,7 @@ spec: containers: - name: packet-counter image: quay.io/netobserv/flowlogs-pipeline:main + imagePullPolicy: Always ports: - containerPort: 9999 hostPort: 9999 @@ -126,7 +127,6 @@ data: - "SrcK8S_OwnerName" - "DstK8S_Namespace" - "DstK8S_OwnerName" - - "FlowDirection" url: http://loki.netobserv.svc:3100 timestampLabel: TimeFlowEndMs timestampScale: 1ms diff --git a/e2e/basic/common.go b/e2e/basic/common.go index bc498d755..1bea80094 100644 --- a/e2e/basic/common.go +++ b/e2e/basic/common.go @@ -4,6 +4,7 @@ package basic import ( "context" + "fmt" "testing" "time" @@ -27,7 +28,7 @@ type FlowCaptureTester struct { Timeout time.Duration } -func (bt *FlowCaptureTester) DoTest(t *testing.T) { +func (bt *FlowCaptureTester) DoTest(t *testing.T, isIPFIX bool) { var pci podsConnectInfo f1 := features.New("basic flow capture").Setup( func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { @@ -55,7 +56,6 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { // Same for DstMac when the flow is towards the service assert.Regexp(t, "^[\\da-fA-F]{2}(:[\\da-fA-F]{2}){5}$", flow["DstMac"]) - assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) assert.EqualValues(t, 2048, flow["Etype"]) assert.EqualValues(t, 6, flow["Proto"]) @@ -67,7 +67,18 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Less(t, time.Since(asTime(flow["TimeFlowEndMs"])), 15*time.Second) assert.Less(t, time.Since(asTime(flow["TimeFlowStartMs"])), 15*time.Second) - assert.NotEmpty(t, flow["Interface"]) + // IPFIX format doesn't manage plural fields + if isIPFIX { + assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) + assert.NotEmpty(t, flow["Interface"]) + } else { + assert.NotNil(t, flow["Interfaces"]) + interfaces := flow["Interfaces"].([]interface{}) + assert.NotNil(t, flow["IfDirections"]) + directions := flow["IfDirections"].([]interface{}) + assert.Equal(t, len(interfaces), len(directions)) + assert.Regexp(t, "^[01]$", fmt.Sprintf("%.0f", directions[0])) + } return ctx }, ).Assess("correctness of client -> server (as Pod) request flows", @@ -89,7 +100,6 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Regexp(t, "^[\\da-fA-F]{2}(:[\\da-fA-F]{2}){5}$", flow["SrcMac"]) assert.Regexp(t, "(?i)"+pci.serverMAC, flow["DstMac"]) - assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) assert.EqualValues(t, 2048, flow["Etype"]) assert.NotZero(t, flow["Bytes"]) @@ -99,7 +109,18 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Less(t, time.Since(asTime(flow["TimeFlowEndMs"])), 15*time.Second) assert.Less(t, time.Since(asTime(flow["TimeFlowStartMs"])), 15*time.Second) - assert.NotEmpty(t, flow["Interface"]) + // IPFIX format doesn't manage plural fields + if isIPFIX { + assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) + assert.NotEmpty(t, flow["Interface"]) + } else { + assert.NotNil(t, flow["Interfaces"]) + interfaces := flow["Interfaces"].([]interface{}) + assert.NotNil(t, flow["IfDirections"]) + directions := flow["IfDirections"].([]interface{}) + assert.Equal(t, len(interfaces), len(directions)) + assert.Regexp(t, "^[01]$", fmt.Sprintf("%.0f", directions[0])) + } return ctx }, ).Assess("correctness of server (from Service) -> client response flows", @@ -120,7 +141,6 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Regexp(t, "^[\\da-fA-F]{2}(:[\\da-fA-F]{2}){5}$", flow["SrcMac"]) assert.Regexp(t, "(?i)"+pci.clientMAC, flow["DstMac"]) - assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) assert.EqualValues(t, 2048, flow["Etype"]) assert.EqualValues(t, 6, flow["Proto"]) @@ -132,7 +152,18 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Less(t, time.Since(asTime(flow["TimeFlowEndMs"])), 15*time.Second) assert.Less(t, time.Since(asTime(flow["TimeFlowStartMs"])), 15*time.Second) - assert.NotEmpty(t, flow["Interface"]) + // IPFIX format doesn't manage plural fields + if isIPFIX { + assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) + assert.NotEmpty(t, flow["Interface"]) + } else { + assert.NotNil(t, flow["Interfaces"]) + interfaces := flow["Interfaces"].([]interface{}) + assert.NotNil(t, flow["IfDirections"]) + directions := flow["IfDirections"].([]interface{}) + assert.Equal(t, len(interfaces), len(directions)) + assert.Regexp(t, "^[01]$", fmt.Sprintf("%.0f", directions[0])) + } return ctx }, ).Assess("correctness of server (from Pod) -> client response flows", @@ -154,7 +185,6 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { // only check that it is well-formed. assert.Regexp(t, "^[\\da-fA-F]{2}(:[\\da-fA-F]{2}){5}$", flow["DstMac"]) - assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) assert.EqualValues(t, 2048, flow["Etype"]) assert.EqualValues(t, 6, flow["Proto"]) @@ -166,7 +196,18 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T) { assert.Less(t, time.Since(asTime(flow["TimeFlowEndMs"])), 15*time.Second) assert.Less(t, time.Since(asTime(flow["TimeFlowStartMs"])), 15*time.Second) - assert.NotEmpty(t, flow["Interface"]) + // IPFIX format doesn't manage plural fields + if isIPFIX { + assert.Regexp(t, "^[01]$", lq.Stream["FlowDirection"]) + assert.NotEmpty(t, flow["Interface"]) + } else { + assert.NotNil(t, flow["Interfaces"]) + interfaces := flow["Interfaces"].([]interface{}) + assert.NotNil(t, flow["IfDirections"]) + directions := flow["IfDirections"].([]interface{}) + assert.Equal(t, len(interfaces), len(directions)) + assert.Regexp(t, "^[01]$", fmt.Sprintf("%.0f", directions[0])) + } return ctx }, ).Feature() diff --git a/e2e/basic/flow_test.go b/e2e/basic/flow_test.go index d91e6e658..97035e686 100644 --- a/e2e/basic/flow_test.go +++ b/e2e/basic/flow_test.go @@ -55,7 +55,7 @@ func TestBasicFlowCapture(t *testing.T) { Namespace: namespace, Timeout: testTimeout, } - bt.DoTest(t) + bt.DoTest(t, false) } // TestSinglePacketFlows uses a known packet size and number to check that, diff --git a/e2e/cluster/base/03-flp.yml b/e2e/cluster/base/03-flp.yml index ef634ccb5..e5ce20ffe 100644 --- a/e2e/cluster/base/03-flp.yml +++ b/e2e/cluster/base/03-flp.yml @@ -17,6 +17,7 @@ spec: containers: - name: flp image: quay.io/netobserv/flowlogs-pipeline:main + imagePullPolicy: Always ports: - containerPort: 9999 hostPort: 9999 @@ -73,7 +74,6 @@ data: - "SrcK8S_OwnerName" - "DstK8S_Namespace" - "DstK8S_OwnerName" - - "FlowDirection" url: http://loki:3100 timestampLabel: TimeFlowEndMs timestampScale: 1ms diff --git a/e2e/ipfix/ipfix_test.go b/e2e/ipfix/ipfix_test.go index f8a421e2f..6a30bebf7 100644 --- a/e2e/ipfix/ipfix_test.go +++ b/e2e/ipfix/ipfix_test.go @@ -51,5 +51,5 @@ func TestBasicFlowCapture(t *testing.T) { Namespace: namespace, Timeout: testTimeout, } - bt.DoTest(t) + bt.DoTest(t, true) } diff --git a/e2e/kafka/kafka_test.go b/e2e/kafka/kafka_test.go index a7207e874..3f144b6ea 100644 --- a/e2e/kafka/kafka_test.go +++ b/e2e/kafka/kafka_test.go @@ -73,7 +73,7 @@ func TestBasicFlowCapture(t *testing.T) { Namespace: namespace, Timeout: testTimeout, } - bt.DoTest(t) + bt.DoTest(t, false) } func checkResources(client klient.Client, list ...string) bool { diff --git a/e2e/kafka/manifests/20-flp-transformer.yml b/e2e/kafka/manifests/20-flp-transformer.yml index 5fb90a548..69ee77bd2 100644 --- a/e2e/kafka/manifests/20-flp-transformer.yml +++ b/e2e/kafka/manifests/20-flp-transformer.yml @@ -17,6 +17,7 @@ spec: containers: - name: flp image: quay.io/netobserv/flowlogs-pipeline:main + imagePullPolicy: Always args: - --config=/etc/flp/config.yaml volumeMounts: @@ -74,7 +75,6 @@ data: - "SrcK8S_OwnerName" - "DstK8S_Namespace" - "DstK8S_OwnerName" - - "FlowDirection" url: http://loki:3100 timestampLabel: TimeFlowEndMs timestampScale: 1ms diff --git a/go.mod b/go.mod index c06fe2854..e24bf4a6c 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/gopacket v1.1.19 github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118 - github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240305083238-24bf8cec8807 + github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240312115357-ddc1f67022a5 github.com/netobserv/gopipes v0.3.0 github.com/paulbellamy/ratecounter v0.2.0 github.com/prometheus/client_golang v1.19.0 @@ -70,7 +70,7 @@ require ( github.com/libp2p/go-reuseport v0.3.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/minio/md5-simd v1.1.2 // indirect - github.com/minio/minio-go/v7 v7.0.68 // indirect + github.com/minio/minio-go/v7 v7.0.69 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/spdystream v0.2.0 // indirect @@ -100,9 +100,9 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect @@ -110,11 +110,11 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect - golang.org/x/net v0.21.0 // indirect + golang.org/x/net v0.22.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/term v0.17.0 // indirect + golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 6aa2253e7..e834b16d6 100644 --- a/go.sum +++ b/go.sum @@ -584,8 +584,8 @@ github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKju github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go/v7 v7.0.68 h1:hTqSIfLlpXaKuNy4baAp4Jjy2sqZEN9hRxD0M4aOfrQ= -github.com/minio/minio-go/v7 v7.0.68/go.mod h1:XAvOPJQ5Xlzk5o3o/ArO2NMbhSGkimC+bpW/ngRKDmQ= +github.com/minio/minio-go/v7 v7.0.69 h1:l8AnsQFyY1xiwa/DaQskY4NXSLA2yrGsW5iD9nRPVS0= +github.com/minio/minio-go/v7 v7.0.69/go.mod h1:XAvOPJQ5Xlzk5o3o/ArO2NMbhSGkimC+bpW/ngRKDmQ= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -628,8 +628,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240305083238-24bf8cec8807 h1:Ugqf2/JylU52lDvPufB+W6CRBZ+CGvmg+mnghAm6LJk= -github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240305083238-24bf8cec8807/go.mod h1:1rE8nv5+z0VIMmikjRsk3Sbw325Z1pGJrRXwlFsdyWQ= +github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240312115357-ddc1f67022a5 h1:eKS116Eks3NT6k3fGTBAxXY1aQmAVOfh/cuT1Qj1KiI= +github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240312115357-ddc1f67022a5/go.mod h1:OyKXDufQOQjfEpw5StxNfGCNJ2JIUvb8DO3x9jkAfpg= github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60= github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU= github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw= @@ -874,12 +874,12 @@ go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 h1:f2jriWfOdldanBwS9jNBdeOKAQN7b4ugAMaNu1/1k9g= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0/go.mod h1:B+bcQI1yTY+N0vqMpoZbEN7+XU4tNM0DmUiOwebFJWI= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 h1:q/Nj5/2TZRIt6PderQ9oU0M00fzoe8UZuINGw6ETGTw= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1/go.mod h1:DTE9yAu6r08jU3xa68GiSeI7oRcSEQ2RpKbbQGO+dWM= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0 h1:mM8nKi6/iFQ0iqst80wDHU2ge198Ye/TfN0WBS5U24Y= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0/go.mod h1:0PrIIzDteLSmNyxqcGYRL4mDIo8OTuBAOI/Bn1URxac= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 h1:p3A5+f5l9e/kuEBwLOrnpkIDHQFlHmbiVxMURWRK6gQ= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1/go.mod h1:OClrnXUjBqQbInvjJFjYSnMxBSCXBF8r3b34WqjiIrQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 h1:Xw8U6u2f8DK2XAkGRFV7BBLENgnTGX9i4rQRxJf+/vs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0/go.mod h1:6KW1Fm6R/s6Z3PGXwSJN2K4eT6wQB3vXX6CVnYX9NmM= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= @@ -927,8 +927,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1016,8 +1016,8 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1127,8 +1127,8 @@ golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/vendor/github.com/minio/minio-go/v7/CREDITS b/vendor/github.com/minio/minio-go/v7/CREDITS index d20923181..dce3d4c9a 100644 --- a/vendor/github.com/minio/minio-go/v7/CREDITS +++ b/vendor/github.com/minio/minio-go/v7/CREDITS @@ -1365,60 +1365,6 @@ THE SOFTWARE. ================================================================ -github.com/sirupsen/logrus -https://github.com/sirupsen/logrus ----------------------------------------------------------------- -The MIT License (MIT) - -Copyright (c) 2014 Simon Eskildsen - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. - -================================================================ - -github.com/stretchr/testify -https://github.com/stretchr/testify ----------------------------------------------------------------- -MIT License - -Copyright (c) 2012-2020 Mat Ryer, Tyler Bunnell and contributors. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - -================================================================ - golang.org/x/crypto https://golang.org/x/crypto ---------------------------------------------------------------- @@ -1748,59 +1694,3 @@ third-party archives. ================================================================ -gopkg.in/yaml.v3 -https://gopkg.in/yaml.v3 ----------------------------------------------------------------- - -This project is covered by two different licenses: MIT and Apache. - -#### MIT License #### - -The following files were ported to Go from C files of libyaml, and thus -are still covered by their original MIT license, with the additional -copyright staring in 2011 when the project was ported over: - - apic.go emitterc.go parserc.go readerc.go scannerc.go - writerc.go yamlh.go yamlprivateh.go - -Copyright (c) 2006-2010 Kirill Simonov -Copyright (c) 2006-2011 Kirill Simonov - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - -### Apache License ### - -All the remaining project files are covered by the Apache license: - -Copyright (c) 2011-2019 Canonical Ltd - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -================================================================ - diff --git a/vendor/github.com/minio/minio-go/v7/api.go b/vendor/github.com/minio/minio-go/v7/api.go index 787087050..930e082ab 100644 --- a/vendor/github.com/minio/minio-go/v7/api.go +++ b/vendor/github.com/minio/minio-go/v7/api.go @@ -1,6 +1,6 @@ /* * MinIO Go Library for Amazon S3 Compatible Cloud Storage - * Copyright 2015-2023 MinIO, Inc. + * Copyright 2015-2024 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,6 +80,8 @@ type Client struct { // S3 specific accelerated endpoint. s3AccelerateEndpoint string + // S3 dual-stack endpoints are enabled by default. + s3DualstackEnabled bool // Region endpoint region string @@ -127,7 +129,7 @@ type Options struct { // Global constants. const ( libraryName = "minio-go" - libraryVersion = "v7.0.68" + libraryVersion = "v7.0.69" ) // User Agent should always following the below style. @@ -158,9 +160,12 @@ func New(endpoint string, opts *Options) (*Client, error) { if err != nil { return nil, err } - // If Amazon S3 set to signature v4. if s3utils.IsAmazonEndpoint(*clnt.endpointURL) { + // If Amazon S3 set to signature v4. clnt.overrideSignerType = credentials.SignatureV4 + // Amazon S3 endpoints are resolved into dual-stack endpoints by default + // for backwards compatibility. + clnt.s3DualstackEnabled = true } return clnt, nil @@ -330,6 +335,16 @@ func (c *Client) SetS3TransferAccelerate(accelerateEndpoint string) { } } +// SetS3EnableDualstack turns s3 dual-stack endpoints on or off for all requests. +// The feature is only specific to S3 and is on by default. To read more about +// Amazon S3 dual-stack endpoints visit - +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/dual-stack-endpoints.html +func (c *Client) SetS3EnableDualstack(enabled bool) { + if s3utils.IsAmazonEndpoint(*c.endpointURL) { + c.s3DualstackEnabled = enabled + } +} + // Hash materials provides relevant initialized hash algo writers // based on the expected signature type. // @@ -926,7 +941,7 @@ func (c *Client) makeTargetURL(bucketName, objectName, bucketLocation string, is // Do not change the host if the endpoint URL is a FIPS S3 endpoint or a S3 PrivateLink interface endpoint if !s3utils.IsAmazonFIPSEndpoint(*c.endpointURL) && !s3utils.IsAmazonPrivateLinkEndpoint(*c.endpointURL) { // Fetch new host based on the bucket location. - host = getS3Endpoint(bucketLocation) + host = getS3Endpoint(bucketLocation, c.s3DualstackEnabled) } } } diff --git a/vendor/github.com/minio/minio-go/v7/pkg/credentials/iam_aws.go b/vendor/github.com/minio/minio-go/v7/pkg/credentials/iam_aws.go index 90d075ab1..7322948ec 100644 --- a/vendor/github.com/minio/minio-go/v7/pkg/credentials/iam_aws.go +++ b/vendor/github.com/minio/minio-go/v7/pkg/credentials/iam_aws.go @@ -61,6 +61,7 @@ type IAM struct { // Support for container authorization token https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html Container struct { AuthorizationToken string + AuthorizationTokenFile string CredentialsFullURI string CredentialsRelativeURI string } @@ -105,6 +106,11 @@ func (m *IAM) Retrieve() (Value, error) { token = m.Container.AuthorizationToken } + tokenFile := os.Getenv("AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE") + if tokenFile == "" { + tokenFile = m.Container.AuthorizationToken + } + relativeURI := os.Getenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") if relativeURI == "" { relativeURI = m.Container.CredentialsRelativeURI @@ -181,6 +187,10 @@ func (m *IAM) Retrieve() (Value, error) { roleCreds, err = getEcsTaskCredentials(m.Client, endpoint, token) + case tokenFile != "" && fullURI != "": + endpoint = fullURI + roleCreds, err = getEKSPodIdentityCredentials(m.Client, endpoint, tokenFile) + case fullURI != "": if len(endpoint) == 0 { endpoint = fullURI @@ -305,6 +315,18 @@ func getEcsTaskCredentials(client *http.Client, endpoint, token string) (ec2Role return respCreds, nil } +func getEKSPodIdentityCredentials(client *http.Client, endpoint string, tokenFile string) (ec2RoleCredRespBody, error) { + if tokenFile != "" { + bytes, err := os.ReadFile(tokenFile) + if err != nil { + return ec2RoleCredRespBody{}, fmt.Errorf("getEKSPodIdentityCredentials: failed to read token file:%s", err) + } + token := string(bytes) + return getEcsTaskCredentials(client, endpoint, token) + } + return ec2RoleCredRespBody{}, fmt.Errorf("getEKSPodIdentityCredentials: no tokenFile found") +} + func fetchIMDSToken(client *http.Client, endpoint string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/vendor/github.com/minio/minio-go/v7/s3-endpoints.go b/vendor/github.com/minio/minio-go/v7/s3-endpoints.go index b1de7b62a..068a6bfa1 100644 --- a/vendor/github.com/minio/minio-go/v7/s3-endpoints.go +++ b/vendor/github.com/minio/minio-go/v7/s3-endpoints.go @@ -1,6 +1,6 @@ /* * MinIO Go Library for Amazon S3 Compatible Cloud Storage - * Copyright 2015-2017 MinIO, Inc. + * Copyright 2015-2024 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,48 +17,155 @@ package minio +type awsS3Endpoint struct { + endpoint string + dualstackEndpoint string +} + // awsS3EndpointMap Amazon S3 endpoint map. -var awsS3EndpointMap = map[string]string{ - "us-east-1": "s3.dualstack.us-east-1.amazonaws.com", - "us-east-2": "s3.dualstack.us-east-2.amazonaws.com", - "us-west-2": "s3.dualstack.us-west-2.amazonaws.com", - "us-west-1": "s3.dualstack.us-west-1.amazonaws.com", - "ca-central-1": "s3.dualstack.ca-central-1.amazonaws.com", - "eu-west-1": "s3.dualstack.eu-west-1.amazonaws.com", - "eu-west-2": "s3.dualstack.eu-west-2.amazonaws.com", - "eu-west-3": "s3.dualstack.eu-west-3.amazonaws.com", - "eu-central-1": "s3.dualstack.eu-central-1.amazonaws.com", - "eu-central-2": "s3.dualstack.eu-central-2.amazonaws.com", - "eu-north-1": "s3.dualstack.eu-north-1.amazonaws.com", - "eu-south-1": "s3.dualstack.eu-south-1.amazonaws.com", - "eu-south-2": "s3.dualstack.eu-south-2.amazonaws.com", - "ap-east-1": "s3.dualstack.ap-east-1.amazonaws.com", - "ap-south-1": "s3.dualstack.ap-south-1.amazonaws.com", - "ap-south-2": "s3.dualstack.ap-south-2.amazonaws.com", - "ap-southeast-1": "s3.dualstack.ap-southeast-1.amazonaws.com", - "ap-southeast-2": "s3.dualstack.ap-southeast-2.amazonaws.com", - "ap-northeast-1": "s3.dualstack.ap-northeast-1.amazonaws.com", - "ap-northeast-2": "s3.dualstack.ap-northeast-2.amazonaws.com", - "ap-northeast-3": "s3.dualstack.ap-northeast-3.amazonaws.com", - "af-south-1": "s3.dualstack.af-south-1.amazonaws.com", - "me-central-1": "s3.dualstack.me-central-1.amazonaws.com", - "me-south-1": "s3.dualstack.me-south-1.amazonaws.com", - "sa-east-1": "s3.dualstack.sa-east-1.amazonaws.com", - "us-gov-west-1": "s3.dualstack.us-gov-west-1.amazonaws.com", - "us-gov-east-1": "s3.dualstack.us-gov-east-1.amazonaws.com", - "cn-north-1": "s3.dualstack.cn-north-1.amazonaws.com.cn", - "cn-northwest-1": "s3.dualstack.cn-northwest-1.amazonaws.com.cn", - "ap-southeast-3": "s3.dualstack.ap-southeast-3.amazonaws.com", - "ap-southeast-4": "s3.dualstack.ap-southeast-4.amazonaws.com", - "il-central-1": "s3.dualstack.il-central-1.amazonaws.com", +var awsS3EndpointMap = map[string]awsS3Endpoint{ + "us-east-1": { + "s3.us-east-1.amazonaws.com", + "s3.dualstack.us-east-1.amazonaws.com", + }, + "us-east-2": { + "s3.us-east-2.amazonaws.com", + "s3.dualstack.us-east-2.amazonaws.com", + }, + "us-west-2": { + "s3.us-west-2.amazonaws.com", + "s3.dualstack.us-west-2.amazonaws.com", + }, + "us-west-1": { + "s3.us-west-1.amazonaws.com", + "s3.dualstack.us-west-1.amazonaws.com", + }, + "ca-central-1": { + "s3.ca-central-1.amazonaws.com", + "s3.dualstack.ca-central-1.amazonaws.com", + }, + "eu-west-1": { + "s3.eu-west-1.amazonaws.com", + "s3.dualstack.eu-west-1.amazonaws.com", + }, + "eu-west-2": { + "s3.eu-west-2.amazonaws.com", + "s3.dualstack.eu-west-2.amazonaws.com", + }, + "eu-west-3": { + "s3.eu-west-3.amazonaws.com", + "s3.dualstack.eu-west-3.amazonaws.com", + }, + "eu-central-1": { + "s3.eu-central-1.amazonaws.com", + "s3.dualstack.eu-central-1.amazonaws.com", + }, + "eu-central-2": { + "s3.eu-central-2.amazonaws.com", + "s3.dualstack.eu-central-2.amazonaws.com", + }, + "eu-north-1": { + "s3.eu-north-1.amazonaws.com", + "s3.dualstack.eu-north-1.amazonaws.com", + }, + "eu-south-1": { + "s3.eu-south-1.amazonaws.com", + "s3.dualstack.eu-south-1.amazonaws.com", + }, + "eu-south-2": { + "s3.eu-south-2.amazonaws.com", + "s3.dualstack.eu-south-2.amazonaws.com", + }, + "ap-east-1": { + "s3.ap-east-1.amazonaws.com", + "s3.dualstack.ap-east-1.amazonaws.com", + }, + "ap-south-1": { + "s3.ap-south-1.amazonaws.com", + "s3.dualstack.ap-south-1.amazonaws.com", + }, + "ap-south-2": { + "s3.ap-south-2.amazonaws.com", + "s3.dualstack.ap-south-2.amazonaws.com", + }, + "ap-southeast-1": { + "s3.ap-southeast-1.amazonaws.com", + "s3.dualstack.ap-southeast-1.amazonaws.com", + }, + "ap-southeast-2": { + "s3.ap-southeast-2.amazonaws.com", + "s3.dualstack.ap-southeast-2.amazonaws.com", + }, + "ap-southeast-3": { + "s3.ap-southeast-3.amazonaws.com", + "s3.dualstack.ap-southeast-3.amazonaws.com", + }, + "ap-southeast-4": { + "s3.ap-southeast-4.amazonaws.com", + "s3.dualstack.ap-southeast-4.amazonaws.com", + }, + "ap-northeast-1": { + "s3.ap-northeast-1.amazonaws.com", + "s3.dualstack.ap-northeast-1.amazonaws.com", + }, + "ap-northeast-2": { + "s3.ap-northeast-2.amazonaws.com", + "s3.dualstack.ap-northeast-2.amazonaws.com", + }, + "ap-northeast-3": { + "s3.ap-northeast-3.amazonaws.com", + "s3.dualstack.ap-northeast-3.amazonaws.com", + }, + "af-south-1": { + "s3.af-south-1.amazonaws.com", + "s3.dualstack.af-south-1.amazonaws.com", + }, + "me-central-1": { + "s3.me-central-1.amazonaws.com", + "s3.dualstack.me-central-1.amazonaws.com", + }, + "me-south-1": { + "s3.me-south-1.amazonaws.com", + "s3.dualstack.me-south-1.amazonaws.com", + }, + "sa-east-1": { + "s3.sa-east-1.amazonaws.com", + "s3.dualstack.sa-east-1.amazonaws.com", + }, + "us-gov-west-1": { + "s3.us-gov-west-1.amazonaws.com", + "s3.dualstack.us-gov-west-1.amazonaws.com", + }, + "us-gov-east-1": { + "s3.us-gov-east-1.amazonaws.com", + "s3.dualstack.us-gov-east-1.amazonaws.com", + }, + "cn-north-1": { + "s3.cn-north-1.amazonaws.com.cn", + "s3.dualstack.cn-north-1.amazonaws.com.cn", + }, + "cn-northwest-1": { + "s3.cn-northwest-1.amazonaws.com.cn", + "s3.dualstack.cn-northwest-1.amazonaws.com.cn", + }, + "il-central-1": { + "s3.il-central-1.amazonaws.com", + "s3.dualstack.il-central-1.amazonaws.com", + }, } // getS3Endpoint get Amazon S3 endpoint based on the bucket location. -func getS3Endpoint(bucketLocation string) (s3Endpoint string) { +func getS3Endpoint(bucketLocation string, useDualstack bool) (endpoint string) { s3Endpoint, ok := awsS3EndpointMap[bucketLocation] if !ok { - // Default to 's3.dualstack.us-east-1.amazonaws.com' endpoint. - s3Endpoint = "s3.dualstack.us-east-1.amazonaws.com" + // Default to 's3.us-east-1.amazonaws.com' endpoint. + if useDualstack { + return "s3.dualstack.us-east-1.amazonaws.com" + } + return "s3.us-east-1.amazonaws.com" + } + if useDualstack { + return s3Endpoint.dualstackEndpoint } - return s3Endpoint + return s3Endpoint.endpoint } diff --git a/vendor/github.com/minio/minio-go/v7/utils.go b/vendor/github.com/minio/minio-go/v7/utils.go index 46123ea77..d68f14844 100644 --- a/vendor/github.com/minio/minio-go/v7/utils.go +++ b/vendor/github.com/minio/minio-go/v7/utils.go @@ -517,6 +517,7 @@ var supportedReplicationEncryptionHeaders = map[string]bool{ "x-minio-replication-server-side-encryption-seal-algorithm": true, "x-minio-replication-server-side-encryption-iv": true, "x-minio-replication-encrypted-multipart": true, + "x-minio-replication-actual-object-size": true, // Add more supported headers here. // Must be lower case. } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_grpc.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_grpc.go new file mode 100644 index 000000000..a628a8993 --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_grpc.go @@ -0,0 +1,21 @@ +package api + +import "errors" + +type WriteGRPC struct { + TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"the host name or IP of the target Flow collector"` + TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"the port of the target Flow collector"` +} + +func (w *WriteGRPC) Validate() error { + if w == nil { + return errors.New("you must provide a configuration") + } + if w.TargetHost == "" { + return errors.New("targetHost can't be empty") + } + if w.TargetPort == 0 { + return errors.New("targetPort can't be empty") + } + return nil +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go index ea10e2045..25c08c269 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go @@ -129,6 +129,7 @@ type Write struct { Loki *api.WriteLoki `yaml:"loki,omitempty" json:"loki,omitempty"` Stdout *api.WriteStdout `yaml:"stdout,omitempty" json:"stdout,omitempty"` Ipfix *api.WriteIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"` + GRPC *api.WriteGRPC `yaml:"grpc,omitempty" json:"grpc,omitempty"` } // ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/pipeline_builder.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/pipeline_builder.go index 769a2a396..6577b90d8 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/pipeline_builder.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/pipeline_builder.go @@ -368,6 +368,8 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write. var writer write.Writer var err error switch params.Write.Type { + case api.GRPCType: + writer, err = write.NewWriteGRPC(params) case api.StdoutType: writer, err = write.NewWriteStdout(params) case api.NoneType: diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/client.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/client.go new file mode 100644 index 000000000..082b46ce9 --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/client.go @@ -0,0 +1,40 @@ +package grpc + +import ( + "flag" + "log" + + pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" + "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// ClientConnection wraps a gRPC+protobuf connection +type ClientConnection struct { + client pb.CollectorClient + conn *grpc.ClientConn +} + +func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) { + flag.Parse() + // Set up a connection to the server. + socket := utils.GetSocket(hostIP, hostPort) + conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + + return &ClientConnection{ + client: pb.NewCollectorClient(conn), + conn: conn, + }, nil +} + +func (cp *ClientConnection) Client() pb.CollectorClient { + return cp.client +} + +func (cp *ClientConnection) Close() error { + return cp.conn.Close() +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go new file mode 100644 index 000000000..a3ba1db8a --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go @@ -0,0 +1,209 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.6 +// source: proto/genericmap.proto + +package genericmap + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// The request message containing the GenericMap +type Flow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GenericMap *anypb.Any `protobuf:"bytes,1,opt,name=genericMap,proto3" json:"genericMap,omitempty"` +} + +func (x *Flow) Reset() { + *x = Flow{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_genericmap_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Flow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Flow) ProtoMessage() {} + +func (x *Flow) ProtoReflect() protoreflect.Message { + mi := &file_proto_genericmap_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Flow.ProtoReflect.Descriptor instead. +func (*Flow) Descriptor() ([]byte, []int) { + return file_proto_genericmap_proto_rawDescGZIP(), []int{0} +} + +func (x *Flow) GetGenericMap() *anypb.Any { + if x != nil { + return x.GenericMap + } + return nil +} + +// intentionally empty +type CollectorReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CollectorReply) Reset() { + *x = CollectorReply{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_genericmap_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CollectorReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectorReply) ProtoMessage() {} + +func (x *CollectorReply) ProtoReflect() protoreflect.Message { + mi := &file_proto_genericmap_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CollectorReply.ProtoReflect.Descriptor instead. +func (*CollectorReply) Descriptor() ([]byte, []int) { + return file_proto_genericmap_proto_rawDescGZIP(), []int{1} +} + +var File_proto_genericmap_proto protoreflect.FileDescriptor + +var file_proto_genericmap_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x6d, + 0x61, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, + 0x63, 0x6d, 0x61, 0x70, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x3c, 0x0a, 0x04, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x34, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, + 0x69, 0x63, 0x4d, 0x61, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, + 0x79, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x4d, 0x61, 0x70, 0x22, 0x10, 0x0a, + 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x32, + 0x43, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x36, 0x0a, 0x04, + 0x53, 0x65, 0x6e, 0x64, 0x12, 0x10, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x6d, 0x61, + 0x70, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x1a, 0x1a, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, + 0x6d, 0x61, 0x70, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0e, 0x5a, 0x0c, 0x2e, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, + 0x63, 0x6d, 0x61, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_genericmap_proto_rawDescOnce sync.Once + file_proto_genericmap_proto_rawDescData = file_proto_genericmap_proto_rawDesc +) + +func file_proto_genericmap_proto_rawDescGZIP() []byte { + file_proto_genericmap_proto_rawDescOnce.Do(func() { + file_proto_genericmap_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_genericmap_proto_rawDescData) + }) + return file_proto_genericmap_proto_rawDescData +} + +var file_proto_genericmap_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_genericmap_proto_goTypes = []interface{}{ + (*Flow)(nil), // 0: genericmap.Flow + (*CollectorReply)(nil), // 1: genericmap.CollectorReply + (*anypb.Any)(nil), // 2: google.protobuf.Any +} +var file_proto_genericmap_proto_depIdxs = []int32{ + 2, // 0: genericmap.Flow.genericMap:type_name -> google.protobuf.Any + 0, // 1: genericmap.Collector.Send:input_type -> genericmap.Flow + 1, // 2: genericmap.Collector.Send:output_type -> genericmap.CollectorReply + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_genericmap_proto_init() } +func file_proto_genericmap_proto_init() { + if File_proto_genericmap_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_genericmap_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Flow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_genericmap_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CollectorReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_genericmap_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_genericmap_proto_goTypes, + DependencyIndexes: file_proto_genericmap_proto_depIdxs, + MessageInfos: file_proto_genericmap_proto_msgTypes, + }.Build() + File_proto_genericmap_proto = out.File + file_proto_genericmap_proto_rawDesc = nil + file_proto_genericmap_proto_goTypes = nil + file_proto_genericmap_proto_depIdxs = nil +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go new file mode 100644 index 000000000..12a8d8aaf --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.6 +// source: proto/genericmap.proto + +package genericmap + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// CollectorClient is the client API for Collector service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CollectorClient interface { + Send(ctx context.Context, in *Flow, opts ...grpc.CallOption) (*CollectorReply, error) +} + +type collectorClient struct { + cc grpc.ClientConnInterface +} + +func NewCollectorClient(cc grpc.ClientConnInterface) CollectorClient { + return &collectorClient{cc} +} + +func (c *collectorClient) Send(ctx context.Context, in *Flow, opts ...grpc.CallOption) (*CollectorReply, error) { + out := new(CollectorReply) + err := c.cc.Invoke(ctx, "/genericmap.Collector/Send", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CollectorServer is the server API for Collector service. +// All implementations must embed UnimplementedCollectorServer +// for forward compatibility +type CollectorServer interface { + Send(context.Context, *Flow) (*CollectorReply, error) + mustEmbedUnimplementedCollectorServer() +} + +// UnimplementedCollectorServer must be embedded to have forward compatible implementations. +type UnimplementedCollectorServer struct { +} + +func (UnimplementedCollectorServer) Send(context.Context, *Flow) (*CollectorReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedCollectorServer) mustEmbedUnimplementedCollectorServer() {} + +// UnsafeCollectorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CollectorServer will +// result in compilation errors. +type UnsafeCollectorServer interface { + mustEmbedUnimplementedCollectorServer() +} + +func RegisterCollectorServer(s grpc.ServiceRegistrar, srv CollectorServer) { + s.RegisterService(&Collector_ServiceDesc, srv) +} + +func _Collector_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Flow) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CollectorServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/genericmap.Collector/Send", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CollectorServer).Send(ctx, req.(*Flow)) + } + return interceptor(ctx, in, info, handler) +} + +// Collector_ServiceDesc is the grpc.ServiceDesc for Collector service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Collector_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "genericmap.Collector", + HandlerType: (*CollectorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Send", + Handler: _Collector_Send_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/genericmap.proto", +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/server.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/server.go new file mode 100644 index 000000000..0a085e924 --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/server.go @@ -0,0 +1,77 @@ +package grpc + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" +) + +// CollectorServer wraps a Flow Collector connection & session +type CollectorServer struct { + grpcServer *grpc.Server +} + +type collectorOptions struct { + grpcServerOptions []grpc.ServerOption +} + +// CollectorOption allows overriding the default configuration of the CollectorServer instance. +// Use them in the StartCollector function. +type CollectorOption func(options *collectorOptions) + +func WithGRPCServerOptions(options ...grpc.ServerOption) CollectorOption { + return func(copt *collectorOptions) { + copt.grpcServerOptions = options + } +} + +// StartCollector listens in background for gRPC+Protobuf flows in the given port, and forwards each +// set of *genericmap.Flow by the provided channel. +func StartCollector( + port int, recordForwarder chan<- *genericmap.Flow, options ...CollectorOption, +) (*CollectorServer, error) { + copts := collectorOptions{} + for _, opt := range options { + opt(&copts) + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(copts.grpcServerOptions...) + genericmap.RegisterCollectorServer(grpcServer, &collectorAPI{ + recordForwarder: recordForwarder, + }) + reflection.Register(grpcServer) + go func() { + if err := grpcServer.Serve(lis); err != nil { + panic("error connecting to server: " + err.Error()) + } + }() + return &CollectorServer{ + grpcServer: grpcServer, + }, nil +} + +func (c *CollectorServer) Close() error { + c.grpcServer.Stop() + return nil +} + +type collectorAPI struct { + genericmap.UnimplementedCollectorServer + recordForwarder chan<- *genericmap.Flow +} + +var okReply = &genericmap.CollectorReply{} + +func (c *collectorAPI) Send(_ context.Context, records *genericmap.Flow) (*genericmap.CollectorReply, error) { + c.recordForwarder <- records + return okReply, nil +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_grpc.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_grpc.go new file mode 100644 index 000000000..c89d5f361 --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_grpc.go @@ -0,0 +1,55 @@ +package write + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/anypb" +) + +type writeGRPC struct { + hostIP string + hostPort int + clientConn *grpc.ClientConnection +} + +// Write writes a flow before being stored +func (t *writeGRPC) Write(v config.GenericMap) { + logrus.Tracef("entering writeGRPC Write %s", v) + value, _ := json.Marshal(v) + if _, err := t.clientConn.Client().Send(context.TODO(), &genericmap.Flow{ + GenericMap: &anypb.Any{ + Value: value, + }, + }); err != nil { + logrus.Errorf("writeGRPC send error: %v", err) + } +} + +// NewWriteGRPC create a new write +func NewWriteGRPC(params config.StageParam) (Writer, error) { + logrus.Debugf("entering NewWriteGRPC") + + writeGRPC := &writeGRPC{} + if params.Write != nil && params.Write.GRPC != nil { + if err := params.Write.GRPC.Validate(); err != nil { + return nil, fmt.Errorf("the provided config is not valid: %w", err) + } + writeGRPC.hostIP = params.Write.GRPC.TargetHost + writeGRPC.hostPort = params.Write.GRPC.TargetPort + } else { + return nil, fmt.Errorf("write.grpc param is mandatory: %v", params.Write) + } + logrus.Debugf("NewWriteGRPC ConnectClient %s:%d...", writeGRPC.hostIP, writeGRPC.hostPort) + clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort) + if err != nil { + return nil, err + } + writeGRPC.clientConn = clientConn + return writeGRPC, nil +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go index e76bb9e11..cf984a26d 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/write_ipfix.go @@ -246,8 +246,9 @@ func setStandardIEValue(record config.GenericMap, ieValPtr *entities.InfoElement return fmt.Errorf("unable to find ethernet type (Etype) in record") } case "flowDirection": - if record["FlowDirection"] != nil { - ieVal.SetUnsigned8Value(uint8(record["FlowDirection"].(int))) + dirs := record["IfDirections"].([]int) + if len(dirs) > 0 { + ieVal.SetUnsigned8Value(uint8(dirs[0])) } else { return fmt.Errorf("unable to find flow direction (flowDirection) in record") } @@ -336,8 +337,9 @@ func setStandardIEValue(record config.GenericMap, ieValPtr *entities.InfoElement return fmt.Errorf("unable to find packets in record") } case "interfaceName": - if record["Interface"] != nil { - ieVal.SetStringValue(record["Interface"].(string)) + interfaces := record["Interfaces"].([]string) + if len(interfaces) > 0 { + ieVal.SetStringValue(interfaces[0]) } else { return fmt.Errorf("unable to find interface in record") } diff --git a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/transform/metricdata.go b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/transform/metricdata.go index 985fcfc64..c8ab8dbf6 100644 --- a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/transform/metricdata.go +++ b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/transform/metricdata.go @@ -148,6 +148,7 @@ func DataPoints[N int64 | float64](dPts []metricdata.DataPoint[N]) []*mpb.Number Attributes: AttrIter(dPt.Attributes.Iter()), StartTimeUnixNano: timeUnixNano(dPt.StartTime), TimeUnixNano: timeUnixNano(dPt.Time), + Exemplars: Exemplars(dPt.Exemplars), } switch v := any(dPt.Value).(type) { case int64: @@ -193,6 +194,7 @@ func HistogramDataPoints[N int64 | float64](dPts []metricdata.HistogramDataPoint Sum: &sum, BucketCounts: dPt.BucketCounts, ExplicitBounds: dPt.Bounds, + Exemplars: Exemplars(dPt.Exemplars), } if v, ok := dPt.Min.Value(); ok { vF64 := float64(v) @@ -236,6 +238,7 @@ func ExponentialHistogramDataPoints[N int64 | float64](dPts []metricdata.Exponen Sum: &sum, Scale: dPt.Scale, ZeroCount: dPt.ZeroCount, + Exemplars: Exemplars(dPt.Exemplars), Positive: ExponentialHistogramDataPointBuckets(dPt.PositiveBucket), Negative: ExponentialHistogramDataPointBuckets(dPt.NegativeBucket), @@ -290,3 +293,28 @@ func timeUnixNano(t time.Time) uint64 { } return uint64(t.UnixNano()) } + +// Exemplars returns a slice of OTLP Exemplars generated from exemplars. +func Exemplars[N int64 | float64](exemplars []metricdata.Exemplar[N]) []*mpb.Exemplar { + out := make([]*mpb.Exemplar, 0, len(exemplars)) + for _, exemplar := range exemplars { + e := &mpb.Exemplar{ + FilteredAttributes: KeyValues(exemplar.FilteredAttributes), + TimeUnixNano: timeUnixNano(exemplar.Time), + SpanId: exemplar.SpanID, + TraceId: exemplar.TraceID, + } + switch v := any(exemplar.Value).(type) { + case int64: + e.Value = &mpb.Exemplar_AsInt{ + AsInt: v, + } + case float64: + e.Value = &mpb.Exemplar_AsDouble{ + AsDouble: v, + } + } + out = append(out, e) + } + return out +} diff --git a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/version.go b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/version.go index 776c6cdc4..3538f8a7d 100644 --- a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/version.go +++ b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/version.go @@ -16,5 +16,5 @@ package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpme // Version is the current release version of the OpenTelemetry OTLP over HTTP/protobuf metrics exporter in use. func Version() string { - return "1.23.1" + return "1.24.0" } diff --git a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/options.go b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/options.go index 0fddac758..461610c6b 100644 --- a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/options.go +++ b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/options.go @@ -64,7 +64,7 @@ func WithInsecure() Option { return wrappedOption{otlpconfig.WithInsecure()} } -// WithEndpointURL sets the target endpoint URL the Exporter will connect to. +// WithEndpoint sets the target endpoint the Exporter will connect to. // // If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_METRICS_ENDPOINT // environment variable is set, and this option is not passed, that variable @@ -82,7 +82,7 @@ func WithEndpoint(endpoint string) Option { return wrappedOption{otlpconfig.WithEndpoint(endpoint)} } -// WithEndpoint sets the target endpoint URL the Exporter will connect to. +// WithEndpointURL sets the target endpoint URL the Exporter will connect to. // // If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_METRICS_ENDPOINT // environment variable is set, and this option is not passed, that variable diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go index df578b86c..c2a5b44b3 100644 --- a/vendor/golang.org/x/net/http2/transport.go +++ b/vendor/golang.org/x/net/http2/transport.go @@ -2911,6 +2911,15 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { fl = &cs.flow } if !fl.add(int32(f.Increment)) { + // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR + if cs != nil { + rl.endStreamError(cs, StreamError{ + StreamID: f.StreamID, + Code: ErrCodeFlowControl, + }) + return nil + } + return ConnectionError(ErrCodeFlowControl) } cc.cond.Broadcast() diff --git a/vendor/modules.txt b/vendor/modules.txt index 0034ae5a2..98986d1ed 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -199,7 +199,7 @@ github.com/mdlayher/ethernet # github.com/minio/md5-simd v1.1.2 ## explicit; go 1.14 github.com/minio/md5-simd -# github.com/minio/minio-go/v7 v7.0.68 +# github.com/minio/minio-go/v7 v7.0.69 ## explicit; go 1.21 github.com/minio/minio-go/v7 github.com/minio/minio-go/v7/pkg/credentials @@ -237,7 +237,7 @@ github.com/mwitkow/go-conntrack # github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f ## explicit github.com/mxk/go-flowrate/flowrate -# github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240305083238-24bf8cec8807 +# github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240312115357-ddc1f67022a5 ## explicit; go 1.20 github.com/netobserv/flowlogs-pipeline/pkg/api github.com/netobserv/flowlogs-pipeline/pkg/config @@ -259,6 +259,8 @@ github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write +github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc +github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap github.com/netobserv/flowlogs-pipeline/pkg/prometheus github.com/netobserv/flowlogs-pipeline/pkg/server github.com/netobserv/flowlogs-pipeline/pkg/utils @@ -494,7 +496,7 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/envco go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/retry go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/transform -# go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 +# go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0 ## explicit; go 1.20 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal @@ -506,7 +508,7 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/trans ## explicit; go 1.20 go.opentelemetry.io/otel/exporters/otlp/otlptrace go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform -# go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 +# go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 ## explicit; go 1.20 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal @@ -559,7 +561,7 @@ go.opentelemetry.io/proto/otlp/trace/v1 # go.uber.org/atomic v1.9.0 ## explicit; go 1.13 go.uber.org/atomic -# golang.org/x/crypto v0.19.0 +# golang.org/x/crypto v0.21.0 ## explicit; go 1.18 golang.org/x/crypto/argon2 golang.org/x/crypto/blake2b @@ -572,7 +574,7 @@ golang.org/x/crypto/curve25519/internal/field golang.org/x/exp/constraints golang.org/x/exp/maps golang.org/x/exp/slices -# golang.org/x/net v0.21.0 +# golang.org/x/net v0.22.0 ## explicit; go 1.18 golang.org/x/net/context golang.org/x/net/html @@ -599,7 +601,7 @@ golang.org/x/sys/plan9 golang.org/x/sys/unix golang.org/x/sys/windows golang.org/x/sys/windows/registry -# golang.org/x/term v0.17.0 +# golang.org/x/term v0.18.0 ## explicit; go 1.18 golang.org/x/term # golang.org/x/text v0.14.0