-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DPDV-6415: Remove SIGSEV #100
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #100 +/- ##
==========================================
+ Coverage 77.01% 84.99% +7.98%
==========================================
Files 12 13 +1
Lines 1714 1686 -28
==========================================
+ Hits 1320 1433 +113
+ Misses 310 173 -137
+ Partials 84 80 -4
|
I have no idea, how this could happen. Since there is
|
|
batch = append(batch, eventBundle) | ||
err := dataSetClient.AddEvents(batch) | ||
check(err) | ||
go func(batch []*add_events.EventBundle) { | ||
err := dataSetClient.AddEvents(batch) | ||
check(err) | ||
}(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an intention to call dataSetClient.AddEvents
with bigger and bigger batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the top of the loop there is always new batch created:
for i := 0; i < *eventsCount; i++ {
batch := make([]*add_events.EventBundle, 0)
Furthermore, we can see in the logs, that if you ask for 1k events, there is 1k events processed - not 500k.
for { | ||
cmd := <-manager.operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this channel be closed? If yes, then it is better to use range over channels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This channel is never closed. I use for { msg := <- channel }
vs for msg := range channel
kind of randomly. 🙈
for { | ||
cmd := <-manager.operations | ||
// manager.logger.Debug("SessionManager - processCommands - START", zap.String("cmd", cmd.op), zap.String("key", cmd.key)) | ||
switch cmd.op { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that wi can get pub
after unsub
? If yes, then we should track somehting like channelOpen
type SessionManager struct {
// ...
channels map[string]chan interface{}
channelOpen map[string]bool
// ...
}
func (manager *SessionManager) pub(key string, value interface{}) {
ch, found := manager.channels[key]
if found && manager.channelOpen[key] {
ch <- value
} else {
manager.logger.Warn("Channel for publishing does not exist or is closed", zap.String("key", key))
}
}
func (manager *SessionManager) unsub(key string) {
ch, found := manager.channels[key]
if found {
manager.logger.Debug("Unsubscribing to key", zap.String("key", key))
delete(manager.channels, key)
close(ch)
manager.channelOpen[key] = false
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using channelOpen
will be only growing => it will be consuming more and more memory => this is not something, that we want.
Now there are two ways how you can figure out, that you are publishing messages to closed channel:
- There will be these warnings.
- There will be lot of unprocessed events (marked as
waiting
), so you will get error at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing to the closed chanel causes panic and we should avoid this.
// It allows to subscribe, publish and unsubscribe to/from channels | ||
type SessionManager struct { | ||
eventCallback EventCallback | ||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I do not know, why it wasn't spotted by linter. :) It's all the time complaining about the buf
. :)
Fixed!
@@ -27,6 +27,7 @@ const ( | |||
ShouldSentBufferSize = 5 * 1024 * 1024 | |||
// LimitBufferSize defines maximum payload size (before compression) for REST API | |||
LimitBufferSize = 5*1024*1024 + 960*1024 | |||
MinimalMaxSize = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming MinimalMax or MaximalMax are a bit confusing. Cam we use something like MinAllowedSize
, MinAllowedInterval
, MaxAllowedParallelOutgoing
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, that it looks strange. :/
For the configuration option Foo
, there are constants setting its valid range and they are called MinimalFoo
and MaximalFoo
.
However, when the configuration option is about MaxFoo
, then it leads to MinimalMaxFoo
and MaximalMaxFoo
.
So, you are suggesting to remove the Min/Max
from the name - and have MinimalFoo
and MaximalFoo
?
I would keep it as it is for consistency.
pkg/client/add_events.go
Outdated
func() { | ||
client.Logger.Debug("Listening to events with key - BEGIN", | ||
zap.String("key", key), | ||
) | ||
client.statistics.SessionsOpenedAdd(1) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this need to be wrapped in func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it look symmetric - with the defer
code. :D 🙈
Fixed!
pkg/client/add_events.go
Outdated
tickerPurge := time.NewTicker(purgeTime) | ||
|
||
var buf *buffer.Buffer = nil | ||
loopEvents: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid labels usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove it, since return
will work here as well.
Based on my understanding, this is the only valid construction, how to get from the for loop inside select - https://go.dev/ref/spec#Break_statements. How would you solve it?
Fixed!
pkg/client/add_events.go
Outdated
@@ -337,29 +347,75 @@ func (client *DataSetClient) Shutdown() error { | |||
// log statistics when finish was called | |||
client.logStatistics() | |||
|
|||
createBackOff := func(maxElapsedTime time.Duration) backoff.BackOff { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we introduce separate functions instead of such nested constructs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
pkg/client/add_events.go
Outdated
|
||
var buf *buffer.Buffer = nil | ||
loopEvents: | ||
for processedMsgCnt := 0; ; processedMsgCnt++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
@@ -120,7 +121,8 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) { | |||
attrs := make(map[string]interface{}) | |||
attrs["batch"] = batchKey | |||
attrs["body.str"] = key | |||
attrs["attributes.p1"] = strings.Repeat("A", rand.Intn(2000)) | |||
// attrs["attributes.p1"] = strings.Repeat("A", rand.Intn(buffer_config.LimitBufferSize/(0.1*LogsPerBatch))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this commented code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
pkg/client/add_events_test.go
Outdated
|
||
// TODO: Fix me!!! | ||
// sc.publishAllBuffers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
pkg/client/client_test.go
Outdated
@@ -55,6 +47,7 @@ func TestNewClient(t *testing.T) { | |||
assert.Equal(t, sc4.Config.Tokens.WriteConfig, "writeconfig") | |||
} | |||
|
|||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
func (manager *SessionManager) Pub(key string, value interface{}) { | ||
manager.operations <- command{op: "pub", key: key, value: value} | ||
} | ||
|
||
func (manager *SessionManager) pub(key string, value interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, don't use this function naming - it is very confusing. Technically, you could have a public function named Pub and a private function named pub in the same package. However, this could still be confusing for anyone reading or maintaining the code.
case "sub": | ||
manager.sub(cmd.key) | ||
case "unsub": | ||
manager.unsub(cmd.key) | ||
case "pub": | ||
manager.pub(cmd.key, cmd.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we introduce "enum"?
type Operation string
const (
Sub Operation = "sub"
Pub Operation = "pub"
Unsub Operation = "unsub"
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
**Description:** Upgrade to dataset-go v0.20.0 In the PR #31293 we have introduced releasing unused resources. However there was a bug that was occasional SIGSEVs. This was fixed in scalyr/dataset-go#100. Therefore this PR is: * upgrading to `dataset-go` v0.20.0 - https://github.com/scalyr/dataset-go/releases/tag/v0.20.0 - which contains the fix * introducing configuration option `buffer.max_parallel_outgoing` to control the maximum number of outgoing connections. **Link to tracking Issue:** #33812 **Testing:** Unit tests and stress tests **Documentation:** <img width="1490" alt="Screenshot 2024-06-27 at 12 04 24" src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/122797378/2ebb30d7-e362-49ec-995a-5d9f43d6bb5a">
Jira Link: https://sentinelone.atlassian.net/browse/DPDV-6415
🥅 Goal
After introducing purging of unused sessions in #75 there has been some SIGSEV. So lets' remove them. Previous attempt was #77.
🛠️ Solution
I have originally written this code in general way not using too much go primitives. Now I understand Go better so instead of lot of mutexes, let's use channels instead. During the development I have been many times in the situation when
add_events_long_running_test.go
was passing, butexamples/stress/main.go
was getting stuck or other way around.It is still not a perfect solution, I can still make it stuck. Communication through the channels takes few dozens of ms, so if the
purgeOlderThan
is few dozens of ms as well, then it breaks. From the moment when the event as added to the channel and read by the code responsible for building buffer it can happen that purge time is reached and procedure for removing the buffer and channel is triggered. This causes the problem. In the real world thispurgeOlderThan
is minutes, so it will not happen.Improvements
pubsub
library, but it was causing lot of problems, so now it's not using it.-race
flag to spot data races.🏫 Testing
When I use the same command, that I have used in #75
/usr/bin/time -al ./stress --events=100000 2>&1 | tee out-10-100000-0.20.log