-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restart dispatcher on connection loss #796
Merged
knative-prow-robot
merged 19 commits into
knative:master
from
sbezverk:dispatcher_restart_on_disconnect
Feb 19, 2019
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
f4728ab
Restart dispatcher on connection loss
sbezverk 165176c
Addressing comments
sbezverk 65c404e
Refactor dispatcher for a dynamic reconnect
sbezverk 420e9a0
Removing mutes
sbezverk 377d24c
Fixing unit test
sbezverk 4cfdb95
Fixing unit tests part 2
sbezverk a67ef6d
Fixing unit tests part 3 racing issue
sbezverk b9f75a7
Removing redundant mutex
sbezverk 75cc01b
Address some issue in the logic
sbezverk cbe29e5
Fixing logic a bit more
sbezverk ff2adc4
Addressing comments
sbezverk 16ae6d4
Addressing more comments
sbezverk b1742e8
fix racing in unit test
sbezverk 3336401
Fixing a race
sbezverk d4ae9d7
Addressing comments
sbezverk bb5ae46
Addressing racing conditions
sbezverk aabb1c2
Addressing comments
sbezverk 2ceb894
Addressing more comments
sbezverk ea7e7b6
Addressing more comments part3
sbezverk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package dispatcher | |
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -30,7 +31,16 @@ import ( | |
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" | ||
) | ||
|
||
const clientID = "knative-natss-dispatcher" | ||
const ( | ||
clientID = "knative-natss-dispatcher" | ||
// maxElements defines a maximum number of outstanding re-connect requests | ||
maxElements = 10 | ||
) | ||
|
||
var ( | ||
// retryInterval defines delay in seconds for the next attempt to reconnect to NATSS streaming server | ||
retryInterval = 1 * time.Second | ||
) | ||
|
||
// SubscriptionsSupervisor manages the state of NATS Streaming subscriptions | ||
type SubscriptionsSupervisor struct { | ||
|
@@ -39,30 +49,37 @@ type SubscriptionsSupervisor struct { | |
receiver *provisioners.MessageReceiver | ||
dispatcher *provisioners.MessageDispatcher | ||
|
||
natssConn *stan.Conn | ||
|
||
subscriptionsMux sync.Mutex | ||
subscriptions map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription | ||
|
||
connect chan struct{} | ||
natssURL string | ||
// natConnMux is used to protect natssConn and natssConnInProgress during | ||
// the transition from not connected to connected states. | ||
natssConnMux sync.Mutex | ||
natssConn *stan.Conn | ||
natssConnInProgress bool | ||
} | ||
|
||
// NewDispatcher returns a new SubscriptionsSupervisor. | ||
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) { | ||
d := &SubscriptionsSupervisor{ | ||
logger: logger, | ||
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), | ||
connect: make(chan struct{}, maxElements), | ||
natssURL: natssUrl, | ||
subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription), | ||
} | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, natssUrl, d.logger.Sugar()) | ||
if err != nil { | ||
logger.Error("Connect() failed: ", zap.Error(err)) | ||
return nil, err | ||
} | ||
d.natssConn = nConn | ||
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar()) | ||
|
||
return d, nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) signalReconnect() { | ||
// TODO refactor to make send over the channel non-blocking operation | ||
s.connect <- struct{}{} | ||
} | ||
|
||
func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogger) func(provisioners.ChannelReference, *provisioners.Message) error { | ||
return func(channel provisioners.ChannelReference, m *provisioners.Message) error { | ||
logger.Infof("Received message from %q channel", channel.String()) | ||
|
@@ -73,8 +90,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |
logger.Errorf("Error during marshaling of the message: %v", err) | ||
return err | ||
} | ||
if err := stanutil.Publish(s.natssConn, ch, &message, logger); err != nil { | ||
s.natssConnMux.Lock() | ||
currentNatssConn := s.natssConn | ||
s.natssConnMux.Unlock() | ||
if currentNatssConn == nil { | ||
return fmt.Errorf("No Connection to NATSS") | ||
} | ||
if err := stanutil.Publish(currentNatssConn, ch, &message, logger); err != nil { | ||
logger.Errorf("Error during publish: %v", err) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
logger.Error("Connection to NATSS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATSS. | ||
s.signalReconnect() | ||
return err | ||
} | ||
return err | ||
} | ||
logger.Infof("Published [%s] : '%s'", channel.String(), m.Headers) | ||
|
@@ -83,10 +112,59 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge | |
} | ||
|
||
func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error { | ||
// Starting Connect to establish connection with NATS | ||
go s.Connect(stopCh) | ||
// Trigger Connect to establish connection with NATS | ||
s.signalReconnect() | ||
s.receiver.Start(stopCh) | ||
return nil | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) { | ||
// re-attempting evey 1 second until the connection is established. | ||
ticker := time.NewTicker(retryInterval) | ||
defer ticker.Stop() | ||
for { | ||
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar()) | ||
if err == nil { | ||
// Locking here in order to reduce time in locked state. | ||
s.natssConnMux.Lock() | ||
s.natssConn = nConn | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps i better to use:
right after Lock() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
s.natssConnInProgress = false | ||
s.natssConnMux.Unlock() | ||
return | ||
} | ||
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in %s", err, retryInterval.String()) | ||
select { | ||
case <-ticker.C: | ||
continue | ||
case <-stopCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Connect is called for initial connection as well as after every disconnect | ||
func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { | ||
for { | ||
select { | ||
case <-s.connect: | ||
s.natssConnMux.Lock() | ||
currentConnProgress := s.natssConnInProgress | ||
s.natssConnMux.Unlock() | ||
if !currentConnProgress { | ||
// Case for lost connectivity, setting InProgress to true to prevent recursion | ||
s.natssConnMux.Lock() | ||
s.natssConnInProgress = true | ||
s.natssConnMux.Unlock() | ||
go s.connectWithRetry(stopCh) | ||
} | ||
case <-stopCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error { | ||
s.subscriptionsMux.Lock() | ||
defer s.subscriptionsMux.Unlock() | ||
|
@@ -125,12 +203,12 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. | |
continue | ||
} | ||
// subscribe | ||
if natssSub, err := s.subscribe(cRef, subRef); err != nil { | ||
natssSub, err := s.subscribe(cRef, subRef) | ||
if err != nil { | ||
return err | ||
} else { | ||
chMap[subRef] = natssSub | ||
activeSubs[subRef] = true | ||
} | ||
chMap[subRef] = natssSub | ||
activeSubs[subRef] = true | ||
} | ||
// Unsubscribe for deleted subscriptions | ||
for sub := range chMap { | ||
|
@@ -166,9 +244,21 @@ func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReferenc | |
// subscribe to a NATSS subject | ||
ch := getSubject(channel) | ||
sub := subscription.String() | ||
natssSub, err := (*s.natssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute)) | ||
s.natssConnMux.Lock() | ||
currentNatssConn := s.natssConn | ||
s.natssConnMux.Unlock() | ||
if currentNatssConn == nil { | ||
return nil, fmt.Errorf("No Connection to NATSS") | ||
} | ||
natssSub, err := (*currentNatssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute)) | ||
if err != nil { | ||
s.logger.Error(" Create new NATSS Subscription failed: ", zap.Error(err)) | ||
if err.Error() == stan.ErrConnectionClosed.Error() { | ||
s.logger.Error("Connection to NATSS has been lost, attempting to reconnect.") | ||
// Informing SubscriptionsSupervisor to re-establish connection to NATS | ||
s.signalReconnect() | ||
return nil, err | ||
} | ||
return nil, err | ||
} | ||
s.logger.Sugar().Infof("NATSS Subscription created: %+v", natssSub) | ||
|
@@ -182,7 +272,7 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere | |
if stanSub, ok := s.subscriptions[channel][subscription]; ok { | ||
// delete from NATSS | ||
if err := (*stanSub).Unsubscribe(); err != nil { | ||
s.logger.Error("Unsubscribing NATS Streaming subscription failed: ", zap.Error(err)) | ||
s.logger.Error("Unsubscribing NATSS Streaming subscription failed: ", zap.Error(err)) | ||
return err | ||
} | ||
delete(s.subscriptions[channel], subscription) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this non-blocking:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do in the follow up PR