Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open-source probe analyzer handles multi-threading #558

Merged
merged 7 commits into from
Aug 17, 2023
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

## Unreleased
### Enhancements
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. This change aims to ensure that unexpected topologies are not generated by default when these settings are not explicitly enabled. ([#562](https://github.com/KindlingProject/kindling/pull/562))
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562))
- Optimized the `networkanalyzer` component of the probe analyzer by utilizing Go's goroutines, enabling concurrent execution. ([#558](https://github.com/KindlingProject/kindling/pull/558))

### Bug fixes
- Fix the bug where sending repetitive k8s_info_workload. Now each node only sends its own info.[#554](https://github.com/KindlingProject/kindling/pull/554)
- Fix the bug where sending repetitive k8s_info_workload. Now each node only sends its own info.([#554](https://github.com/KindlingProject/kindling/pull/554))
- Provide a new self metric for probe events. (skipped events/dropped events)([#553](https://github.com/KindlingProject/kindling/pull/553))

## v0.8.0 - 2023-06-30
Expand Down
2 changes: 2 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ analyzers:
need_process_info: false
tcpmetricanalyzer:
networkanalyzer:
# how many events can be held in the channel simultaneously before it's considered full.
event_channel_size: 10000
connect_timeout: 100
# How many seconds to wait until we consider a request as complete.
fd_reuse_timeout: 2
Expand Down
2 changes: 2 additions & 0 deletions collector/pkg/component/analyzer/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
type Config struct {
// This option is set only for testing. We enable it by default otherwise the function will not work.
EnableTimeoutCheck bool
EventChannelSize int `mapstructure:"event_channel_size"`
ConnectTimeout int `mapstructure:"connect_timeout"`
FdReuseTimeout int `mapstructure:"fd_reuse_timeout"`
NoResponseThreshold int `mapstructure:"no_response_threshold"`
Expand All @@ -28,6 +29,7 @@ type Config struct {

func NewDefaultConfig() *Config {
return &Config{
EventChannelSize: 10000,
EnableTimeoutCheck: true,
ConnectTimeout: 100,
FdReuseTimeout: 15,
Expand Down
35 changes: 35 additions & 0 deletions collector/pkg/component/analyzer/network/network_analyzer.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
Expand Down Expand Up @@ -50,6 +51,9 @@ type NetworkAnalyzer struct {
udpMessagePairSize int64
telemetry *component.TelemetryTools

eventChan chan *model.KindlingEvent
stopChan chan bool

// snaplen is the maximum data size the event could accommodate bytes.
// It is set by setting the environment variable SNAPLEN. See https://github.com/KindlingProject/kindling/pull/387.
snaplen int
Expand All @@ -62,6 +66,9 @@ func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, co
dataGroupPool: NewDataGroupPool(),
nextConsumers: consumers,
telemetry: telemetry,

eventChan: make(chan *model.KindlingEvent, config.EventChannelSize),
stopChan: make(chan bool),
}
if config.EnableConntrack {
connConfig := &conntracker.Config{
Expand All @@ -77,6 +84,8 @@ func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, co

na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod))
na.snaplen = getSnaplenEnv()


return na
}

Expand Down Expand Up @@ -144,10 +153,13 @@ func (na *NetworkAnalyzer) Start() error {
na.parsers = parsers

rand.Seed(time.Now().UnixNano())
go na.ConsumeEventFromChannel()
return nil
}

func (na *NetworkAnalyzer) Shutdown() error {
close(na.stopChan)

// TODO: implement
return nil
}
Expand All @@ -157,6 +169,25 @@ func (na *NetworkAnalyzer) Type() analyzer.Type {
}

func (na *NetworkAnalyzer) ConsumeEvent(evt *model.KindlingEvent) error {
na.eventChan <- evt
return nil
}

func (na *NetworkAnalyzer) ConsumeEventFromChannel() {
for {
select {
case evt := <-na.eventChan:
err := na.processEvent(evt)
if err != nil {
na.telemetry.Logger.Error("error happened when processing event: ", zap.Error(err))
}
case <-na.stopChan:
return
}
}
}

func (na *NetworkAnalyzer) processEvent(evt *model.KindlingEvent) error {
if evt.Category != model.Category_CAT_NET {
return nil
}
Expand Down Expand Up @@ -209,6 +240,7 @@ func (na *NetworkAnalyzer) ConsumeEvent(evt *model.KindlingEvent) error {
} else {
return na.analyseResponse(evt)
}
// ... original logic of ConsumeEvent ...
}

func (na *NetworkAnalyzer) consumerFdNoReusingTrace() {
Expand All @@ -231,6 +263,9 @@ func (na *NetworkAnalyzer) consumerFdNoReusingTrace() {
}
return true
})
case <-na.stopChan:
timer.Stop()
return
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
analyzers:
networkanalyzer:
event_channel_size: 10000
connect_timeout: 100
fd_reuse_timeout: 60
no_response_threshold: 120
Expand Down
2 changes: 2 additions & 0 deletions deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ analyzers:
need_process_info: false
tcpmetricanalyzer:
networkanalyzer:
# how many events can be held in the channel simultaneously before it's considered full.
event_channel_size: 10000
connect_timeout: 100
# How many seconds to wait until we consider a request as complete.
fd_reuse_timeout: 2
Expand Down