Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPDV-6415: Remove SIGSEV #100

Merged
merged 37 commits into from
Jun 28, 2024
Merged

Conversation

martin-majlis-s1
Copy link
Collaborator

@martin-majlis-s1 martin-majlis-s1 commented Jun 24, 2024

Jira Link: https://sentinelone.atlassian.net/browse/DPDV-6415

🥅 Goal

After introducing purging of unused sessions in #75 there has been some SIGSEV. So lets' remove them. Previous attempt was #77.

🛠️ Solution

I have originally written this code in general way not using too much go primitives. Now I understand Go better so instead of lot of mutexes, let's use channels instead. During the development I have been many times in the situation when add_events_long_running_test.go was passing, but examples/stress/main.go was getting stuck or other way around.

It is still not a perfect solution, I can still make it stuck. Communication through the channels takes few dozens of ms, so if the purgeOlderThan is few dozens of ms as well, then it breaks. From the moment when the event as added to the channel and read by the code responsible for building buffer it can happen that purge time is reached and procedure for removing the buffer and channel is triggered. This causes the problem. In the real world this purgeOlderThan is minutes, so it will not happen.

Improvements

  • Buffer used to be accessed from multiple goroutines in parallel. Now it's accessed only from single one.
  • It was using pubsub library, but it was causing lot of problems, so now it's not using it.
  • Number of parallel outgoing connection to the server is now configurable.
  • Added more tests to have higher test coverage (77% => 85%).
  • CI/CD pipelines contains timeouts, so it doesn't run for 6 hours.
  • Build examples/stress test with -race flag to spot data races.

🏫 Testing

When I use the same command, that I have used in #75

@codecov-commenter
Copy link

codecov-commenter commented Jun 25, 2024

Codecov Report

Attention: Patch coverage is 82.50000% with 63 lines in your changes missing coverage. Please review.

Project coverage is 84.99%. Comparing base (60effe0) to head (83030c4).

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #100      +/-   ##
==========================================
+ Coverage   77.01%   84.99%   +7.98%     
==========================================
  Files          12       13       +1     
  Lines        1714     1686      -28     
==========================================
+ Hits         1320     1433     +113     
+ Misses        310      173     -137     
+ Partials       84       80       -4     
Files Coverage Δ
pkg/buffer_config/buffer_config.go 97.39% <100.00%> (+44.67%) ⬆️
pkg/buffer/buffer.go 76.96% <96.97%> (+27.83%) ⬆️
pkg/session_manager/session_manager.go 95.35% <95.35%> (ø)
pkg/statistics/statistics.go 89.12% <88.57%> (+5.09%) ⬆️
pkg/client/client.go 87.61% <87.65%> (+0.83%) ⬆️
pkg/client/add_events.go 78.74% <67.38%> (-2.41%) ⬇️

@martin-majlis-s1
Copy link
Collaborator Author

I have no idea, how this could happen. Since there is processCommands - END - it should be waiting for new command. And therefore AAAAA - Memo - Pub - START - should be followed immediately with AAAAA - Memo - Sub - WAIT for channel

2024-06-26T11:22:11.060+0200    DEBUG   client/memo.go:114      AAAAA - Memo - processCommands - END    {"cmd": "sub", "key": "087f9e53-bf76-44ff-af45-9385f79cbd46-31841361c8ccd9f6acdd07e1d8032e49"}
2024-06-26T11:22:11.060+0200    DEBUG   client/memo.go:58       AAAAA - Memo - Sub - END        {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-31841361c8ccd9f6acdd07e1d8032e49"}
2024-06-26T11:22:11.060+0200    DEBUG   client/memo.go:80       AAAAA - Memo - Pub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-31841361c8ccd9f6acdd07e1d8032e49"}
2024-06-26T11:22:11.060+0200    DEBUG   client/memo.go:82       AAAAA - Memo - Pub - END        {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-31841361c8ccd9f6acdd07e1d8032e49"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:54       AAAAA - Memo - Sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:56       AAAAA - Memo - Sub - WAIT for channel   {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:104      AAAAA - Memo - processCommands - START  {"cmd": "sub", "key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:63       AAAAA - Memo - sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:68       AAAAA - Memo - sub - pubsub.sub - before        {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:72       AAAAA - Memo - sub - pubsub.sub - after {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:75       AAAAA - Memo - sub - END        {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:108      AAAAA - Memo - processCommands - sub - before return channel    {"cmd": "sub", "key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:110      AAAAA - Memo - processCommands - sub - after return channel     {"cmd": "sub", "key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:11.072+0200    DEBUG   client/memo.go:114      AAAAA - Memo - processCommands - END    {"cmd": "sub", "key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f98a7b4d6fbc59b867dd567c5a80a452"}
2024-06-26T11:22:18.862+0200    DEBUG   client/memo.go:54       AAAAA - Memo - Sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-a9366c739577a3d961d69bec5060a63c"}
2024-06-26T11:22:18.872+0200    DEBUG   client/memo.go:54       AAAAA - Memo - Sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-288a5714bc9439c11dac1abe01a41675"}
2024-06-26T11:22:18.883+0200    DEBUG   client/memo.go:54       AAAAA - Memo - Sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-db876dd0eb65312d8a08a74f64cafbe5"}
2024-06-26T11:22:18.893+0200    DEBUG   client/memo.go:54       AAAAA - Memo - Sub - START      {"key": "087f9e53-bf76-44ff-af45-9385f79cbd46-f2bf54fb27b22e77295e264777739f9c"}

@martin-majlis-s1
Copy link
Collaborator Author

 47910	2024-06-26T13:13:43.415+0200	DEBUG	client/memo.go:111	AAAAA - Memo - processCommands - WAITING
 47918	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:137	AAAAA - Memo - purge - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47920	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:140	AAAAA - Memo - purge - END	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47921	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:131	AAAAA - Memo - purge - WAITING
 47922	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:113	AAAAA - Memo - processCommands - START	{"cmd": "unsub", "key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47923	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:93	AAAAA - Memo - unsub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47924	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:96	AAAAA - Memo - Unsubscribing to key	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47925	2024-06-26T13:13:43.423+0200	DEBUG	client/memo.go:98	AAAAA - Memo - pubsub.unsub - before	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47926	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:101	AAAAA - Memo - pubsub.unsub - after	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47927	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:105	AAAAA - Memo - unsub - END	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47928	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:125	AAAAA - Memo - processCommands - END	{"cmd": "unsub", "key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47929	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:111	AAAAA - Memo - processCommands - WAITING
 47933	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:55	AAAAA - Memo - Sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47934	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:57	AAAAA - Memo - Sub - WAIT for channel	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47937	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:113	AAAAA - Memo - processCommands - START	{"cmd": "sub", "key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47938	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:64	AAAAA - Memo - sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47939	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:67	AAAAA - Memo - Subscribing to key	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47941	2024-06-26T13:13:43.424+0200	DEBUG	client/memo.go:69	AAAAA - Memo - sub - pubsub.sub - before	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-70b27ee4e8a5fe8a6aa31a0ecf46844c"}
 47951	2024-06-26T13:13:43.433+0200	DEBUG	client/memo.go:137	AAAAA - Memo - purge - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-2d3440be6e9b2f214e1af1d3d7643a1a"}
 47955	2024-06-26T13:13:43.434+0200	DEBUG	client/memo.go:55	AAAAA - Memo - Sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-2d3440be6e9b2f214e1af1d3d7643a1a"}
 47969	2024-06-26T13:13:43.444+0200	DEBUG	client/memo.go:55	AAAAA - Memo - Sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-aa4f8120f15357aaedf19d34b0aaa904"}
 48034	2024-06-26T13:13:43.604+0200	DEBUG	client/memo.go:55	AAAAA - Memo - Sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-a6e6cc15667ef2c27ac059355f5e2671"}
 48041	2024-06-26T13:13:43.614+0200	DEBUG	client/memo.go:55	AAAAA - Memo - Sub - START	{"key": "f8b4561b-9953-4f7f-be59-83483cc7abd1-f74f6b11ba0e0d2d8782bf428d19ce61"}

out-12.log
out-12-MEMO.log

Comment on lines 169 to +173
batch = append(batch, eventBundle)
err := dataSetClient.AddEvents(batch)
check(err)
go func(batch []*add_events.EventBundle) {
err := dataSetClient.AddEvents(batch)
check(err)
}(batch)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an intention to call dataSetClient.AddEvents with bigger and bigger batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the top of the loop there is always new batch created:

	for i := 0; i < *eventsCount; i++ {
		batch := make([]*add_events.EventBundle, 0)

Furthermore, we can see in the logs, that if you ask for 1k events, there is 1k events processed - not 500k.

Comment on lines +113 to +114
for {
cmd := <-manager.operations

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this channel be closed? If yes, then it is better to use range over channels

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This channel is never closed. I use for { msg := <- channel } vs for msg := range channel kind of randomly. 🙈

for {
cmd := <-manager.operations
// manager.logger.Debug("SessionManager - processCommands - START", zap.String("cmd", cmd.op), zap.String("key", cmd.key))
switch cmd.op {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that wi can get pub after unsub? If yes, then we should track somehting like channelOpen

type SessionManager struct {
 // ...
 channels      map[string]chan interface{}
 channelOpen   map[string]bool
 // ...
}

func (manager *SessionManager) pub(key string, value interface{}) {
 ch, found := manager.channels[key]
 if found && manager.channelOpen[key] {
  ch <- value
 } else {
  manager.logger.Warn("Channel for publishing does not exist or is closed", zap.String("key", key))
 }
}

func (manager *SessionManager) unsub(key string) {
 ch, found := manager.channels[key]
 if found {
  manager.logger.Debug("Unsubscribing to key", zap.String("key", key))
  delete(manager.channels, key)
  close(ch)
  manager.channelOpen[key] = false
 }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using channelOpen will be only growing => it will be consuming more and more memory => this is not something, that we want.

Now there are two ways how you can figure out, that you are publishing messages to closed channel:

  1. There will be these warnings.
  2. There will be lot of unprocessed events (marked as waiting), so you will get error at the end.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to the closed chanel causes panic and we should avoid this.

// It allows to subscribe, publish and unsubscribe to/from channels
type SessionManager struct {
eventCallback EventCallback
mu sync.Mutex

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused var?

Copy link
Collaborator Author

@martin-majlis-s1 martin-majlis-s1 Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I do not know, why it wasn't spotted by linter. :) It's all the time complaining about the buf. :)

Fixed!

@@ -27,6 +27,7 @@ const (
ShouldSentBufferSize = 5 * 1024 * 1024
// LimitBufferSize defines maximum payload size (before compression) for REST API
LimitBufferSize = 5*1024*1024 + 960*1024
MinimalMaxSize = 100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming MinimalMax or MaximalMax are a bit confusing. Cam we use something like MinAllowedSize, MinAllowedInterval, MaxAllowedParallelOutgoing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, that it looks strange. :/

For the configuration option Foo, there are constants setting its valid range and they are called MinimalFoo and MaximalFoo.

However, when the configuration option is about MaxFoo, then it leads to MinimalMaxFoo and MaximalMaxFoo.

So, you are suggesting to remove the Min/Max from the name - and have MinimalFoo and MaximalFoo?
I would keep it as it is for consistency.

Comment on lines 192 to 197
func() {
client.Logger.Debug("Listening to events with key - BEGIN",
zap.String("key", key),
)
client.statistics.SessionsOpenedAdd(1)
}()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this need to be wrapped in func?

Copy link
Collaborator Author

@martin-majlis-s1 martin-majlis-s1 Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it look symmetric - with the defer code. :D 🙈

Fixed!

tickerPurge := time.NewTicker(purgeTime)

var buf *buffer.Buffer = nil
loopEvents:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid labels usage

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove it, since return will work here as well.

Based on my understanding, this is the only valid construction, how to get from the for loop inside select - https://go.dev/ref/spec#Break_statements. How would you solve it?

Fixed!

@@ -337,29 +347,75 @@ func (client *DataSetClient) Shutdown() error {
// log statistics when finish was called
client.logStatistics()

createBackOff := func(maxElapsedTime time.Duration) backoff.BackOff {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce separate functions instead of such nested constructs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!


var buf *buffer.Buffer = nil
loopEvents:
for processedMsgCnt := 0; ; processedMsgCnt++ {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

@@ -120,7 +121,8 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
attrs := make(map[string]interface{})
attrs["batch"] = batchKey
attrs["body.str"] = key
attrs["attributes.p1"] = strings.Repeat("A", rand.Intn(2000))
// attrs["attributes.p1"] = strings.Repeat("A", rand.Intn(buffer_config.LimitBufferSize/(0.1*LogsPerBatch)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this commented code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 419 to 421

// TODO: Fix me!!!
// sc.publishAllBuffers()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -55,6 +47,7 @@ func TestNewClient(t *testing.T) {
assert.Equal(t, sc4.Config.Tokens.WriteConfig, "writeconfig")
}

/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Comment on lines 79 to 83
func (manager *SessionManager) Pub(key string, value interface{}) {
manager.operations <- command{op: "pub", key: key, value: value}
}

func (manager *SessionManager) pub(key string, value interface{}) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, don't use this function naming - it is very confusing. Technically, you could have a public function named Pub and a private function named pub in the same package. However, this could still be confusing for anyone reading or maintaining the code.

Comment on lines 113 to 118
case "sub":
manager.sub(cmd.key)
case "unsub":
manager.unsub(cmd.key)
case "pub":
manager.pub(cmd.key, cmd.value)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce "enum"?

type Operation string

const (
 Sub   Operation = "sub"
 Pub   Operation = "pub"
 Unsub Operation = "unsub"
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@martin-majlis-s1 martin-majlis-s1 merged commit 9b79a20 into main Jun 28, 2024
17 checks passed
@martin-majlis-s1 martin-majlis-s1 deleted the DPDV-6415-fix-sig-sev-attempt-2 branch June 28, 2024 13:37
mx-psi pushed a commit to open-telemetry/opentelemetry-collector-contrib that referenced this pull request Jul 1, 2024
**Description:** Upgrade to dataset-go v0.20.0

In the PR #31293 we have introduced releasing unused resources. However
there was a bug that was occasional SIGSEVs. This was fixed in
scalyr/dataset-go#100.

Therefore this PR is:
* upgrading to `dataset-go` v0.20.0 -
https://github.com/scalyr/dataset-go/releases/tag/v0.20.0 - which
contains the fix
* introducing configuration option `buffer.max_parallel_outgoing` to
control the maximum number of outgoing connections.

**Link to tracking Issue:** #33812

**Testing:** Unit tests and stress tests

**Documentation:** 

<img width="1490" alt="Screenshot 2024-06-27 at 12 04 24"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/2ebb30d7-e362-49ec-995a-5d9f43d6bb5a">
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants