-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add status watcher for bayeux client #10
Add status watcher for bayeux client #10
Conversation
kush-elastic
commented
Apr 19, 2022
•
edited
Loading
edited
- Add client to watch over status of subscriptions. To check how many connections bayeux instance has.
- Make sure to close channel as soon as context is canceled to ensure it won't hold channel open for forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rephrase the issue description to present why we need this change/what kind of issue it fixes, and link it with the other issue.
bayeux.go
Outdated
} | ||
|
||
func (st *Status) disconnect(out chan MaybeMsg) { | ||
close(out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an anti-pattern in general, you shouldn't close channels you don't own/create.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will update it.
bayeux.go
Outdated
id clientIDAndCookies | ||
creds Credentials | ||
id clientIDAndCookies | ||
status Status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the Status
need to be exposed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need it exposed.
Status was already exposed, I am just reusing it to get connection information from bayeux object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's hide it then if it isn't necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
bayeux.go
Outdated
defer wg.Done() | ||
defer close(out) | ||
defer status.disconnect(out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have 3 deferred functions now, maybe you can replace it with a block defer func() { ... }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update it to:
defer func() { close(out) wg.Done() status.disconnect() }()
@@ -266,7 +295,12 @@ func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg { | |||
return | |||
} | |||
for _, e := range x { | |||
out <- MaybeMsg{Msg: e} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add any comments regarding this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, Thanks
bayeux.go
Outdated
@@ -298,6 +332,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials, | |||
|
|||
func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg { | |||
b.creds = creds | |||
b.status = status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain what are you trying to achieve? In L132 the status is global (which is not good in general), but here you're setting it as part of the Bayeux
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added one parameter connectCount
in Status struct. I need to ensure if connections are closed as soon as context is canceled from filebeat side(for test case to ensure if channel is closing or not). To do that I need some mechanism in bayeux. but i don't have any way to access status object so for reference I used bayeux object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that this code doesn't need any concurrency safety mechanism? For example in MQTT input, I used a WaitGroup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of this for concurrency safety mechanism for in flight messages. as we can only close channel from bayeux side. we will wait till all the inbound messages are sent via channel.
var waitMsgs sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
waitMsgs.Wait()
close(out)
status.disconnect()
wg.Done()
}()
for {
for _, e := range x {
waitMsgs.Add(1)
go func() {
defer waitMsgs.Done()
// close channel as soon as context is canceled to ensure it won't hold channel open for forever
out <- MaybeMsg{Msg: e}
}()
}
}
}()
return out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to tell, as the entire library code is tangled a bit. I can't confirm that this change doesn't introduce any other issues (deadlocks). What I don't like about the suggested approach is for-in-for loop.
WDYT about refactoring the library code or writing it from scratch? We may try to get rid of all globals and focus on concurrency safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry about that, I removed other things there to make it simple to understand but actual snip should look like this:
func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
var waitMsgs sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
waitMsgs.Wait()
close(out)
status.disconnect()
wg.Done()
}()
for {
select {
case <-ctx.Done():
return
default:
postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
resp, err := b.call(ctx, postBody, b.creds.bayeuxUrl())
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
out <- MaybeMsg{Err: fmt.Errorf("cannot connect to bayeux: %s, trying again", err)}
} else {
if os.Getenv("DEBUG") != "" {
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
}
// Restore the io.ReadCloser to its original state
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
// Use the content
s := string(b)
logger.Printf("Response Body: %s", s)
}
var x []TriggerEvent
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&x); err != nil && err == io.EOF {
out <- MaybeMsg{Err: err}
return
}
for _, e := range x {
waitMsgs.Add(1)
go func() {
defer waitMsgs.Done()
// close channel as soon as context is canceled to ensure it won't hold channel open for forever
out <- MaybeMsg{Msg: e}
}()
}
}
}
}
}()
return out
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it looks good to me, let's get it into the PR code.
bayeux.go
Outdated
} | ||
|
||
type Client struct { | ||
BayOb Bayeux |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Ob
stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me update it to something meaningful.
bayeux.go
Outdated
@@ -298,6 +332,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials, | |||
|
|||
func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg { | |||
b.creds = creds | |||
b.status = status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it looks good to me, let's get it into the PR code.
bayeux.go
Outdated
@@ -298,6 +335,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials, | |||
|
|||
func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg { | |||
b.creds = creds | |||
b.status = status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the original question:
Why you can't access the global "status"? If you need to expose the global state, just expose a static function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.