-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restart dispatcher on connection loss #796
Changes from 16 commits
f4728ab
165176c
65c404e
420e9a0
377d24c
4cfdb95
a67ef6d
b9f75a7
75cc01b
cbe29e5
ff2adc4
16ae6d4
b1742e8
3336401
d4ae9d7
bb5ae46
aabb1c2
2ceb894
ea7e7b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ package dispatcher | |
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
// "reflect" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -30,7 +32,11 @@ import ( | |
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" | ||
) | ||
|
||
const clientID = "knative-natss-dispatcher" | ||
const ( | ||
clientID = "knative-natss-dispatcher" | ||
// maxElements defines a maximum number of outstanding re-connect requests | ||
maxElements = 10 | ||
) | ||
|
||
// SubscriptionsSupervisor manages the state of NATS Streaming subscriptions | ||
type SubscriptionsSupervisor struct { | ||
|
@@ -39,30 +45,36 @@ type SubscriptionsSupervisor struct { | |
receiver *provisioners.MessageReceiver | ||
dispatcher *provisioners.MessageDispatcher | ||
|
||
natssConn *stan.Conn | ||
|
||
subscriptionsMux sync.Mutex | ||
subscriptions map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription | ||
|
||
connect chan struct{} | ||
natssURL string | ||
// natConnMux is used to protect natssConn and natssConnInProgress during | ||
// the transition from not connected to connected states. | ||
natssConnMux sync.Mutex | ||
natssConn *stan.Conn | ||
natssConnInProgress bool | ||
} | ||
|
||
// NewDispatcher returns a new SubscriptionsSupervisor. | ||
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) { | ||
d := &SubscriptionsSupervisor{ | ||
logger: logger, | ||
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), | ||
connect: make(chan struct{}, maxElements), | ||
natssURL: natssUrl, | ||
subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription), | ||
} | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, natssUrl, d.logger.Sugar()) | ||
if err != nil { | ||
logger.Error("Connect() failed: ", zap.Error(err)) | ||
return nil, err | ||
} | ||
d.natssConn = nConn | ||
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) | ||
|
||
return d, nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) signalReconnect() { | ||
s.connect <- struct{}{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this non-blocking: func (s *SubscriptionsSupervisor) signalReconnect() {
select {
case s.connect <- struct{}{}:
// Sent.
default:
// The Channel is already full, so a reconnection attempt will occur.
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will do in the follow up PR |
||
} | ||
|
||
func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogger) func(provisioners.ChannelReference, *provisioners.Message) error { | ||
return func(channel provisioners.ChannelReference, m *provisioners.Message) error { | ||
logger.Infof("Received message from %q channel", channel.String()) | ||
|
@@ -73,8 +85,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |
logger.Errorf("Error during marshaling of the message: %v", err) | ||
return err | ||
} | ||
if err := stanutil.Publish(s.natssConn, ch, &message, logger); err != nil { | ||
s.natssConnMux.Lock() | ||
currentNatssConn := s.natssConn | ||
s.natssConnMux.Unlock() | ||
if currentNatssConn == nil { | ||
return fmt.Errorf("No Connection to NATSS") | ||
} | ||
if err := stanutil.Publish(currentNatssConn, ch, &message, logger); err != nil { | ||
logger.Errorf("Error during publish: %v", err) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
logger.Error("Connection to NATSS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATSS. | ||
s.signalReconnect() | ||
return err | ||
} | ||
return err | ||
} | ||
logger.Infof("Published [%s] : '%s'", channel.String(), m.Headers) | ||
|
@@ -83,10 +107,60 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |
} | ||
|
||
func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error { | ||
// Starting Connect to establish connection with NATS | ||
go s.Connect(stopCh) | ||
// Trigger Connect to establish connection with NATS | ||
s.signalReconnect() | ||
s.receiver.Start(stopCh) | ||
return nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) { | ||
// re-attempting evey 60 seconds until the connection is established. | ||
ticker := time.NewTicker(60 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change it to 1s, 60s is too big and it's the only place which creates a new connection. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, but do you think 1 second is not too aggressive? |
||
defer ticker.Stop() | ||
for { | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
// Locking here in order to reduce time in locked state | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a trailing period. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
s.natssConnMux.Lock() | ||
s.natssConn = nConn | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps i better to use:
right after Lock() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
s.natssConnInProgress = false | ||
s.natssConnMux.Unlock() | ||
return | ||
} | ||
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in 60 seconds", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change the message too: "retrying in 1 second" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
select { | ||
case <-ticker.C: | ||
continue | ||
case <-stopCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Connect is called for initial connection as well as after every disconnect | ||
func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { | ||
for { | ||
select { | ||
case <-s.connect: | ||
s.natssConnMux.Lock() | ||
// currentNatssConn := s.natssConn | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please delete the commented line:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
currentConnProgress := s.natssConnInProgress | ||
s.natssConnMux.Unlock() | ||
if !currentConnProgress { | ||
// Case for lost connectivity, setting InProgress to true to prevent recursion | ||
s.natssConnMux.Lock() | ||
s.natssConnInProgress = true | ||
s.natssConnMux.Unlock() | ||
go s.connectWithRetry(stopCh) | ||
} | ||
case <-stopCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error { | ||
s.subscriptionsMux.Lock() | ||
defer s.subscriptionsMux.Unlock() | ||
|
@@ -125,12 +199,12 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. | |
continue | ||
} | ||
// subscribe | ||
if natssSub, err := s.subscribe(cRef, subRef); err != nil { | ||
natssSub, err := s.subscribe(cRef, subRef) | ||
if err != nil { | ||
return err | ||
} else { | ||
chMap[subRef] = natssSub | ||
activeSubs[subRef] = true | ||
} | ||
chMap[subRef] = natssSub | ||
activeSubs[subRef] = true | ||
} | ||
// Unsubscribe for deleted subscriptions | ||
for sub := range chMap { | ||
|
@@ -166,9 +240,21 @@ func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReferenc | |
// subscribe to a NATSS subject | ||
ch := getSubject(channel) | ||
sub := subscription.String() | ||
natssSub, err := (*s.natssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute)) | ||
s.natssConnMux.Lock() | ||
currentNatssConn := s.natssConn | ||
s.natssConnMux.Unlock() | ||
if currentNatssConn == nil { | ||
return nil, fmt.Errorf("No Connection to NATSS") | ||
} | ||
natssSub, err := (*currentNatssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute)) | ||
if err != nil { | ||
s.logger.Error(" Create new NATSS Subscription failed: ", zap.Error(err)) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
s.logger.Error("Connection to NATSS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATS | ||
s.signalReconnect() | ||
return nil, err | ||
} | ||
return nil, err | ||
} | ||
s.logger.Sugar().Infof("NATSS Subscription created: %+v", natssSub) | ||
|
@@ -182,7 +268,7 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere | |
if stanSub, ok := s.subscriptions[channel][subscription]; ok { | ||
// delete from NATSS | ||
if err := (*stanSub).Unsubscribe(); err != nil { | ||
s.logger.Error("Unsubscribing NATS Streaming subscription failed: ", zap.Error(err)) | ||
s.logger.Error("Unsubscribing NATSS Streaming subscription failed: ", zap.Error(err)) | ||
return err | ||
} | ||
delete(s.subscriptions[channel], subscription) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete the commented line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done