-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restart dispatcher on connection loss #796
Restart dispatcher on connection loss #796
Conversation
/assign @Harwayne |
@@ -69,6 +69,9 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
ch := getSubject(channel) | |||
if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil { | |||
logger.Errorf("Error during publish: %v", err) | |||
if err.Error() == stan.ErrConnectionClosed.Error() { | |||
logger.Fatalf("Connection to NATS was lost, dispatcher is exiting.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO that states we should reconnect rather than restarting, but for now restarting is better than staying in a broken state forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will do.
Looks good to me. It's not in anyway ideal, because requests from upstream will get rejected as the Pod restarts, but is distinctly better than now, where once the connection closes, the dispatcher will not work again. I would say that this shouldn't close the bug, but should reference it. Then we still have the bug if anyone wants to tackle it. /assign @radufa |
@Harwayne I refactored it for a dynamic reconnect. Appreciate your review and feedback. |
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
s.subscriptionsMux.Lock() | ||
s.natssConn = nConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps i better to use:
defer s.subscriptionsMux.Unlock()
right after Lock()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
for { | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
s.subscriptionsMux.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this now intended to lock s.natssConn
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, adding a separate mutex just for natssConn does not seem to be efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference would be to have a second mux, natssConnMux
for natssConn and natssConnInProgress.
@@ -67,8 +64,17 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
logger.Infof("Received message from %q channel", channel.String()) | |||
// publish to Natss | |||
ch := getSubject(channel) | |||
if s.natssConn == nil { | |||
return fmt.Errorf("No Connection to NATS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NATS -> NATSS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil { | ||
logger.Errorf("Error during publish: %v", err) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
logger.Error("Connection to NATS has been lost, attempting to reconnect.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NATS -> NATSS
Here and all the other comments and log lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if err := stanutil.Publish(s.natssConn, ch, &m.Payload, logger); err != nil { | ||
logger.Errorf("Error during publish: %v", err) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
logger.Error("Connection to NATS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NATS -> NATSS
And a trailing period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
select { | ||
case <-s.connect: | ||
// Establishing connection only if there is no already connection to NATS | ||
if s.natssConn == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What sets s.natssConn
to nil
? It looks like it will be nil
when the struct is initialized, but after that, what sets it to nil
?
@@ -77,10 +83,45 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
} | |||
|
|||
func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error { | |||
// Trigger Connect to establish connection with NATS | |||
s.connect <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably kick off s.Connect()
here and pass in the stopCh
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will give it a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
contrib/natss/pkg/dispatcher/main.go
Outdated
@@ -68,6 +68,8 @@ func main() { | |||
if err != nil { | |||
logger.Fatal("Unable to create NATSS dispatcher.", zap.Error(err)) | |||
} | |||
// Starting Connect to establish connection with NATS | |||
go dispatcher.Connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference is to put this inside the dispatcher's Start(stopCh)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will give it a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this change in, did it not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@radufa I have extensively tested the latest revision, the connection get reestablished everytime. Appreciate if you try it in your test bed. |
@@ -67,8 +65,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
logger.Infof("Received message from %q channel", channel.String()) | |||
// publish to Natss | |||
ch := getSubject(channel) | |||
if s.natssConn == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is the lock required to be held to access or mutate s.natssConn
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I did not put lock here is because the code which is setting this value is in the same goroutine. I tested this code with go tests --race
it is all clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I follow your explanation. The only place I see s.natssConn
set is connectWithRetry
, is that running in this goroutine?
Based on the comment above, we only need to hold the lock while writing s.natssConn
, is that why we don't grab the lock here?
// re-attempting evey 60 seconds until the connection is established. | ||
ticker := time.NewTicker(60 * time.Second) | ||
defer ticker.Stop() | ||
s.subscriptionsMux.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be held for potentially a long time, is that safe? Does it need to be held when we are not actually manipulating s.natssConn
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move it close to the place where the value gets set.
dispatcher *provisioners.MessageDispatcher | ||
connect chan struct{} | ||
natssURL string | ||
subscriptionsMux sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment saying when this has to be held.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/test pull-knative-eventing-integration-tests |
2 similar comments
/test pull-knative-eventing-integration-tests |
/test pull-knative-eventing-integration-tests |
contrib/natss/pkg/dispatcher/main.go
Outdated
@@ -68,6 +68,8 @@ func main() { | |||
if err != nil { | |||
logger.Fatal("Unable to create NATSS dispatcher.", zap.Error(err)) | |||
} | |||
// Starting Connect to establish connection with NATS | |||
go dispatcher.Connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this change in, did it not work?
logger.Fatalf("Failed to connect to NATSS!") | ||
} | ||
} | ||
logger.Infof("><SB> (*s.natssConn).NatsConn().IsConnected(): %t", (*s.natssConn).NatsConn().IsConnected()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These log lines don't look like they should make it in as-is (I'm guessing is so you can find them quickly with grep).
} | ||
|
||
// NewDispatcher returns a new SubscriptionsSupervisor. | ||
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) { | ||
d := &SubscriptionsSupervisor{ | ||
logger: logger, | ||
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), | ||
connect: make(chan struct{}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a blocking Channel, should it be non-blocking?
If so, then the various writes to it below can be made into non-blocking writes, because as long as there is at least one item in the channel, a reconnection attempt will occur, so multiple items in the channel aren't needed.
@@ -67,8 +65,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
logger.Infof("Received message from %q channel", channel.String()) | |||
// publish to Natss | |||
ch := getSubject(channel) | |||
if s.natssConn == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I follow your explanation. The only place I see s.natssConn
set is connectWithRetry
, is that running in this goroutine?
Based on the comment above, we only need to hold the lock while writing s.natssConn
, is that why we don't grab the lock here?
@@ -67,8 +68,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |||
logger.Infof("Received message from %q channel", channel.String()) | |||
// publish to Natss | |||
ch := getSubject(channel) | |||
if s.natssConn == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save s.natssConn
to a local variable so that all the checks are done against the same value (i.e. we check for nil
and then use the checked object).
for { | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
s.subscriptionsMux.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference would be to have a second mux, natssConnMux
for natssConn and natssConnInProgress.
s.receiver.Start(stopCh) | ||
return nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) connectWithRetry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably have a stop channel, even if only for the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
if currentNatssConn == nil { | ||
return fmt.Errorf("No Connection to NATSS") | ||
} | ||
if reflect.ValueOf(currentNatssConn).IsNil() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with Go reflection, how does this differ from if currentNatssConn == nil
three lines above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check helps to prevent panic when we do (*currentNattsConn).NatssCon().IsConnected(), as during debugging I cold see currentNattsConn is not nil but it point to nil interface when the connection gets lost. reflect.ValueOf allows us safely evaluate that the interface is not nil. Let me know if it makes sense.
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
logger.Error("Connection to NATSS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATSS. | ||
s.connect <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can be fancy and make this a non-blocking insertion. Something like replace this with s.signalReconnect()
(not a good name). And then have:
func (s *SubscriptionsSupervisor) signalReconnect() {
select {
case s.connect <- struct{}{}:
// Sent.
default:
// The Channel is already full, so a reconnection attempt will occur.
}
}
If we do this, then all writing to the channel should be via the s.signalReconnect()
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, sounds good thanks for suggestion.
for { | ||
select { | ||
case <-s.connect: | ||
if s.natssConn == nil && !s.natssConnInProgress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.natssConn
is protected by the lock, acquire the lock before reading it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
go s.connectWithRetry(stopCh) | ||
continue | ||
} | ||
if !(*s.natssConn).NatsConn().IsConnected() && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If s.natssConn
is nil
and s.natssConnInProgress
is true
, will this panic (because it tries to dereference s.natssConn
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provided a protection against this corner case.
@Harwayne Addressed your comments. |
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
I've tested the last changes, using the same scenario as discussed yesterday and it panics too, see please my last comments in #781 /hold |
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@@ -19,7 +19,7 @@ package dispatcher | |||
import ( | |||
"encoding/json" | |||
"fmt" | |||
"reflect" | |||
// "reflect" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete the commented line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
s.receiver.Start(stopCh) | ||
return nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) { | ||
// re-attempting evey 60 seconds until the connection is established. | ||
ticker := time.NewTicker(60 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change it to 1s, 60s is too big and it's the only place which creates a new connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, but do you think 1 second is not too aggressive?
select { | ||
case <-s.connect: | ||
s.natssConnMux.Lock() | ||
// currentNatssConn := s.natssConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete the commented line:
// currentNatssConn := s.natssConn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
s.natssConnMux.Unlock() | ||
return | ||
} | ||
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in 60 seconds", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the message too: "retrying in 1 second"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/lgtm |
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
/lgtm |
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) | ||
|
||
return d, nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) signalReconnect() { | ||
s.connect <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this non-blocking:
func (s *SubscriptionsSupervisor) signalReconnect() {
select {
case s.connect <- struct{}{}:
// Sent.
default:
// The Channel is already full, so a reconnection attempt will occur.
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do in the follow up PR
for { | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
// Locking here in order to reduce time in locked state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a trailing period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
The following is the coverage report on pkg/.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
@vaikas-google for approval |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: sbezverk, vaikas-google The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: Serguei Bezverkhi sbezverk@cisco.com
xref #781