Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart dispatcher on connection loss #796

Merged
merged 19 commits into from
Feb 19, 2019
Merged

Restart dispatcher on connection loss #796

merged 19 commits into from
Feb 19, 2019

Conversation

sbezverk
Copy link
Contributor

@sbezverk sbezverk commented Feb 9, 2019

Signed-off-by: Serguei Bezverkhi sbezverk@cisco.com

xref #781

NATSS channels now recover from connection loss.

@googlebot googlebot added the cla: yes Indicates the PR's author has signed the CLA. label Feb 9, 2019
@knative-prow-robot knative-prow-robot added the size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. label Feb 9, 2019
@sbezverk
Copy link
Contributor Author

sbezverk commented Feb 9, 2019

/assign @Harwayne

@@ -69,6 +69,9 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
ch := getSubject(channel)
if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil {
logger.Errorf("Error during publish: %v", err)
if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Fatalf("Connection to NATS was lost, dispatcher is exiting.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO that states we should reconnect rather than restarting, but for now restarting is better than staying in a broken state forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will do.

@Harwayne
Copy link
Contributor

Harwayne commented Feb 9, 2019

Looks good to me. It's not in anyway ideal, because requests from upstream will get rejected as the Pod restarts, but is distinctly better than now, where once the connection closes, the dispatcher will not work again.

I would say that this shouldn't close the bug, but should reference it. Then we still have the bug if anyone wants to tackle it.

/assign @radufa
As he did the initial NATSS work.

@knative-prow-robot knative-prow-robot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. and removed size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. labels Feb 10, 2019
@sbezverk
Copy link
Contributor Author

@Harwayne I refactored it for a dynamic reconnect. Appreciate your review and feedback.

@knative-prow-robot knative-prow-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Feb 10, 2019
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
s.subscriptionsMux.Lock()
s.natssConn = nConn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps i better to use:

defer s.subscriptionsMux.Unlock()

right after Lock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for {
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
s.subscriptionsMux.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this now intended to lock s.natssConn as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, adding a separate mutex just for natssConn does not seem to be efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to have a second mux, natssConnMux for natssConn and natssConnInProgress.

@@ -67,8 +64,17 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Infof("Received message from %q channel", channel.String())
// publish to Natss
ch := getSubject(channel)
if s.natssConn == nil {
return fmt.Errorf("No Connection to NATS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NATS -> NATSS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil {
logger.Errorf("Error during publish: %v", err)
if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Error("Connection to NATS has been lost, attempting to reconnect.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NATS -> NATSS

Here and all the other comments and log lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil {
logger.Errorf("Error during publish: %v", err)
if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Error("Connection to NATS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NATS -> NATSS

And a trailing period.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

select {
case <-s.connect:
// Establishing connection only if there is no already connection to NATS
if s.natssConn == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sets s.natssConn to nil? It looks like it will be nil when the struct is initialized, but after that, what sets it to nil?

@@ -77,10 +83,45 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
}

func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error {
// Trigger Connect to establish connection with NATS
s.connect <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably kick off s.Connect() here and pass in the stopCh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will give it a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -68,6 +68,8 @@ func main() {
if err != nil {
logger.Fatal("Unable to create NATSS dispatcher.", zap.Error(err))
}
// Starting Connect to establish connection with NATS
go dispatcher.Connect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to put this inside the dispatcher's Start(stopCh).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will give it a try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this change in, did it not work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sbezverk
Copy link
Contributor Author

@radufa I have extensively tested the latest revision, the connection get reestablished everytime. Appreciate if you try it in your test bed.

@@ -67,8 +65,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Infof("Received message from %q channel", channel.String())
// publish to Natss
ch := getSubject(channel)
if s.natssConn == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is the lock required to be held to access or mutate s.natssConn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did not put lock here is because the code which is setting this value is in the same goroutine. I tested this code with go tests --race it is all clean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I follow your explanation. The only place I see s.natssConn set is connectWithRetry, is that running in this goroutine?

Based on the comment above, we only need to hold the lock while writing s.natssConn, is that why we don't grab the lock here?

// re-attempting evey 60 seconds until the connection is established.
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
s.subscriptionsMux.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be held for potentially a long time, is that safe? Does it need to be held when we are not actually manipulating s.natssConn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move it close to the place where the value gets set.

dispatcher *provisioners.MessageDispatcher
connect chan struct{}
natssURL string
subscriptionsMux sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment saying when this has to be held.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sbezverk
Copy link
Contributor Author

/test pull-knative-eventing-integration-tests

2 similar comments
@chaodaiG
Copy link
Contributor

/test pull-knative-eventing-integration-tests

@chaodaiG
Copy link
Contributor

/test pull-knative-eventing-integration-tests

@@ -68,6 +68,8 @@ func main() {
if err != nil {
logger.Fatal("Unable to create NATSS dispatcher.", zap.Error(err))
}
// Starting Connect to establish connection with NATS
go dispatcher.Connect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this change in, did it not work?

logger.Fatalf("Failed to connect to NATSS!")
}
}
logger.Infof("><SB> (*s.natssConn).NatsConn().IsConnected(): %t", (*s.natssConn).NatsConn().IsConnected())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These log lines don't look like they should make it in as-is (I'm guessing is so you can find them quickly with grep).

}

// NewDispatcher returns a new SubscriptionsSupervisor.
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) {
d := &SubscriptionsSupervisor{
logger: logger,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
connect: make(chan struct{}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a blocking Channel, should it be non-blocking?

If so, then the various writes to it below can be made into non-blocking writes, because as long as there is at least one item in the channel, a reconnection attempt will occur, so multiple items in the channel aren't needed.

@@ -67,8 +65,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Infof("Received message from %q channel", channel.String())
// publish to Natss
ch := getSubject(channel)
if s.natssConn == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I follow your explanation. The only place I see s.natssConn set is connectWithRetry, is that running in this goroutine?

Based on the comment above, we only need to hold the lock while writing s.natssConn, is that why we don't grab the lock here?

@@ -67,8 +68,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Infof("Received message from %q channel", channel.String())
// publish to Natss
ch := getSubject(channel)
if s.natssConn == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save s.natssConn to a local variable so that all the checks are done against the same value (i.e. we check for nil and then use the checked object).

for {
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
s.subscriptionsMux.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to have a second mux, natssConnMux for natssConn and natssConnInProgress.

s.receiver.Start(stopCh)
return nil
}

func (s *SubscriptionsSupervisor) connectWithRetry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have a stop channel, even if only for the unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
if currentNatssConn == nil {
return fmt.Errorf("No Connection to NATSS")
}
if reflect.ValueOf(currentNatssConn).IsNil() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Go reflection, how does this differ from if currentNatssConn == nil three lines above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check helps to prevent panic when we do (*currentNattsConn).NatssCon().IsConnected(), as during debugging I cold see currentNattsConn is not nil but it point to nil interface when the connection gets lost. reflect.ValueOf allows us safely evaluate that the interface is not nil. Let me know if it makes sense.

if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Error("Connection to NATSS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATSS.
s.connect <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be fancy and make this a non-blocking insertion. Something like replace this with s.signalReconnect() (not a good name). And then have:

func (s *SubscriptionsSupervisor) signalReconnect() {
    select {
    case s.connect <- struct{}{}:
      // Sent.
    default:
        // The Channel is already full, so a reconnection attempt will occur.
    }
}

If we do this, then all writing to the channel should be via the s.signalReconnect() function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good thanks for suggestion.

for {
select {
case <-s.connect:
if s.natssConn == nil && !s.natssConnInProgress {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.natssConn is protected by the lock, acquire the lock before reading it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

go s.connectWithRetry(stopCh)
continue
}
if !(*s.natssConn).NatsConn().IsConnected() &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If s.natssConn is nil and s.natssConnInProgress is true, will this panic (because it tries to dereference s.natssConn)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provided a protection against this corner case.

@sbezverk
Copy link
Contributor Author

@Harwayne Addressed your comments.

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@ghost
Copy link

ghost commented Feb 15, 2019

I've tested the last changes, using the same scenario as discussed yesterday and it panics too, see please my last comments in #781

/hold

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@@ -19,7 +19,7 @@ package dispatcher
import (
"encoding/json"
"fmt"
"reflect"
// "reflect"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the commented line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s.receiver.Start(stopCh)
return nil
}

func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) {
// re-attempting evey 60 seconds until the connection is established.
ticker := time.NewTicker(60 * time.Second)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change it to 1s, 60s is too big and it's the only place which creates a new connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, but do you think 1 second is not too aggressive?

select {
case <-s.connect:
s.natssConnMux.Lock()
// currentNatssConn := s.natssConn
Copy link

@ghost ghost Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the commented line:

// currentNatssConn := s.natssConn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
s.natssConnMux.Unlock()
return
}
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in 60 seconds", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the message too: "retrying in 1 second"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@ghost
Copy link

ghost commented Feb 19, 2019

/lgtm

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 19, 2019
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Feb 19, 2019
@ghost
Copy link

ghost commented Feb 19, 2019

/lgtm

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 19, 2019
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar())

return d, nil
}

func (s *SubscriptionsSupervisor) signalReconnect() {
s.connect <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this non-blocking:

func (s *SubscriptionsSupervisor) signalReconnect() {
    select {
    case s.connect <- struct{}{}:
      // Sent.
    default:
        // The Channel is already full, so a reconnection attempt will occur.
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in the follow up PR

for {
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
// Locking here in order to reduce time in locked state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a trailing period.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Feb 19, 2019
@knative-metrics-robot
Copy link

The following is the coverage report on pkg/.
Say /test pull-knative-eventing-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go 56.5% 57.5% 1.0
contrib/natss/pkg/stanutil/stanutil.go 60.0% 58.6% -1.4

Copy link
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 19, 2019
@sbezverk
Copy link
Contributor Author

@vaikas-google for approval

@vaikas
Copy link
Contributor

vaikas commented Feb 19, 2019

/lgtm
/approve

@knative-prow-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: sbezverk, vaikas-google

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Feb 19, 2019
@knative-prow-robot knative-prow-robot merged commit 2142e53 into knative:master Feb 19, 2019
@sbezverk sbezverk deleted the dispatcher_restart_on_disconnect branch February 19, 2019 23:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. lgtm Indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants