Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status watcher for bayeux client #10

Merged

Conversation

kush-elastic
Copy link
Collaborator

@kush-elastic kush-elastic commented Apr 19, 2022

  • Add client to watch over status of subscriptions. To check how many connections bayeux instance has.
  • Make sure to close channel as soon as context is canceled to ensure it won't hold channel open for forever.

@kush-elastic kush-elastic added the enhancement New feature or request label Apr 19, 2022
@kush-elastic kush-elastic requested review from cmacknz and mtojek April 19, 2022 13:57
@kush-elastic kush-elastic self-assigned this Apr 19, 2022
@elasticmachine
Copy link
Collaborator

elasticmachine commented Apr 19, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-04-21T10:38:20.680+0000

  • Duration: 2 min 23 sec

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

Copy link
Contributor

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rephrase the issue description to present why we need this change/what kind of issue it fixes, and link it with the other issue.

bayeux.go Outdated
}

func (st *Status) disconnect(out chan MaybeMsg) {
close(out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an anti-pattern in general, you shouldn't close channels you don't own/create.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will update it.

bayeux.go Outdated
id clientIDAndCookies
creds Credentials
id clientIDAndCookies
status Status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Status need to be exposed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need it exposed.
Status was already exposed, I am just reusing it to get connection information from bayeux object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's hide it then if it isn't necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

bayeux.go Outdated
Comment on lines 265 to 266
defer wg.Done()
defer close(out)
defer status.disconnect(out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have 3 deferred functions now, maybe you can replace it with a block defer func() { ... }?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it to:
defer func() { close(out) wg.Done() status.disconnect() }()

@@ -266,7 +295,12 @@ func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
return
}
for _, e := range x {
out <- MaybeMsg{Msg: e}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add any comments regarding this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Thanks

bayeux.go Outdated
@@ -298,6 +332,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials,

func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg {
b.creds = creds
b.status = status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain what are you trying to achieve? In L132 the status is global (which is not good in general), but here you're setting it as part of the Bayeux?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added one parameter connectCount in Status struct. I need to ensure if connections are closed as soon as context is canceled from filebeat side(for test case to ensure if channel is closing or not). To do that I need some mechanism in bayeux. but i don't have any way to access status object so for reference I used bayeux object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this code doesn't need any concurrency safety mechanism? For example in MQTT input, I used a WaitGroup.

Copy link
Collaborator Author

@kush-elastic kush-elastic Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this for concurrency safety mechanism for in flight messages. as we can only close channel from bayeux side. we will wait till all the inbound messages are sent via channel.

        var waitMsgs sync.WaitGroup
	wg.Add(1)
	go func() {
		defer func() {
			waitMsgs.Wait()
			close(out)
			status.disconnect()
			wg.Done()
		}()
		for {
			for _, e := range x {
				waitMsgs.Add(1)
				go func() {
					defer waitMsgs.Done()
					// close channel as soon as context is canceled to ensure it won't hold channel open for forever
					out <- MaybeMsg{Msg: e}
				}()
			}
		}
	}()
	return out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard for me to tell, as the entire library code is tangled a bit. I can't confirm that this change doesn't introduce any other issues (deadlocks). What I don't like about the suggested approach is for-in-for loop.

WDYT about refactoring the library code or writing it from scratch? We may try to get rid of all globals and focus on concurrency safety.

Copy link
Collaborator Author

@kush-elastic kush-elastic Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry about that, I removed other things there to make it simple to understand but actual snip should look like this:

func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
	var waitMsgs sync.WaitGroup
	wg.Add(1)
	go func() {
		defer func() {
			waitMsgs.Wait()
			close(out)
			status.disconnect()
			wg.Done()
		}()
		for {
			select {
			case <-ctx.Done():
				return
			default:
				postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
				resp, err := b.call(ctx, postBody, b.creds.bayeuxUrl())
				if err != nil {
					if errors.Is(err, context.Canceled) {
						return
					}
					out <- MaybeMsg{Err: fmt.Errorf("cannot connect to bayeux: %s, trying again", err)}
				} else {
					if os.Getenv("DEBUG") != "" {
						var b []byte
						if resp.Body != nil {
							b, _ = ioutil.ReadAll(resp.Body)
						}
						// Restore the io.ReadCloser to its original state
						resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
						// Use the content
						s := string(b)
						logger.Printf("Response Body: %s", s)
					}
					var x []TriggerEvent
					decoder := json.NewDecoder(resp.Body)
					if err := decoder.Decode(&x); err != nil && err == io.EOF {
						out <- MaybeMsg{Err: err}
						return
					}
					for _, e := range x {
						waitMsgs.Add(1)
						go func() {
							defer waitMsgs.Done()
							// close channel as soon as context is canceled to ensure it won't hold channel open for forever
							out <- MaybeMsg{Msg: e}
						}()
					}
				}
			}
		}
	}()
	return out
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it looks good to me, let's get it into the PR code.

@kush-elastic kush-elastic changed the title add status watcher for bayeux client Add status watcher for bayeux client Apr 20, 2022
@kush-elastic kush-elastic requested a review from mtojek April 20, 2022 09:08
bayeux.go Outdated
}

type Client struct {
BayOb Bayeux
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Ob stand for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me update it to something meaningful.

bayeux.go Outdated
@@ -298,6 +332,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials,

func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg {
b.creds = creds
b.status = status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it looks good to me, let's get it into the PR code.

bayeux.go Outdated
@@ -298,6 +335,7 @@ func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials,

func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg {
b.creds = creds
b.status = status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the original question:

Why you can't access the global "status"? If you need to expose the global state, just expose a static function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

@kush-elastic kush-elastic requested a review from mtojek April 21, 2022 10:38
@kush-elastic kush-elastic merged commit acdef69 into elastic:master Apr 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants