Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart dispatcher on connection loss #796

Merged
merged 19 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 106 additions & 16 deletions contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -30,7 +31,16 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
)

const clientID = "knative-natss-dispatcher"
const (
clientID = "knative-natss-dispatcher"
// maxElements defines a maximum number of outstanding re-connect requests
maxElements = 10
)

var (
// retryInterval defines delay in seconds for the next attempt to reconnect to NATSS streaming server
retryInterval = 1 * time.Second
)

// SubscriptionsSupervisor manages the state of NATS Streaming subscriptions
type SubscriptionsSupervisor struct {
Expand All @@ -39,30 +49,37 @@ type SubscriptionsSupervisor struct {
receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher

natssConn *stan.Conn

subscriptionsMux sync.Mutex
subscriptions map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription

connect chan struct{}
natssURL string
// natConnMux is used to protect natssConn and natssConnInProgress during
// the transition from not connected to connected states.
natssConnMux sync.Mutex
natssConn *stan.Conn
natssConnInProgress bool
}

// NewDispatcher returns a new SubscriptionsSupervisor.
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) {
d := &SubscriptionsSupervisor{
logger: logger,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
connect: make(chan struct{}, maxElements),
natssURL: natssUrl,
subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription),
}
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, natssUrl, d.logger.Sugar())
if err != nil {
logger.Error("Connect() failed: ", zap.Error(err))
return nil, err
}
d.natssConn = nConn
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar())

return d, nil
}

func (s *SubscriptionsSupervisor) signalReconnect() {
// TODO refactor to make send over the channel non-blocking operation
s.connect <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this non-blocking:

func (s *SubscriptionsSupervisor) signalReconnect() {
    select {
    case s.connect <- struct{}{}:
      // Sent.
    default:
        // The Channel is already full, so a reconnection attempt will occur.
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in the follow up PR

}

func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogger) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(channel provisioners.ChannelReference, m *provisioners.Message) error {
logger.Infof("Received message from %q channel", channel.String())
Expand All @@ -73,8 +90,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Errorf("Error during marshaling of the message: %v", err)
return err
}
if err := stanutil.Publish(s.natssConn, ch, &message, logger); err != nil {
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn == nil {
return fmt.Errorf("No Connection to NATSS")
}
if err := stanutil.Publish(currentNatssConn, ch, &message, logger); err != nil {
logger.Errorf("Error during publish: %v", err)
if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Error("Connection to NATSS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATSS.
s.signalReconnect()
return err
}
return err
}
logger.Infof("Published [%s] : '%s'", channel.String(), m.Headers)
Expand All @@ -83,10 +112,59 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
}

func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error {
// Starting Connect to establish connection with NATS
go s.Connect(stopCh)
// Trigger Connect to establish connection with NATS
s.signalReconnect()
s.receiver.Start(stopCh)
return nil
}

func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) {
// re-attempting evey 1 second until the connection is established.
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for {
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
// Locking here in order to reduce time in locked state.
s.natssConnMux.Lock()
s.natssConn = nConn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps i better to use:

defer s.subscriptionsMux.Unlock()

right after Lock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s.natssConnInProgress = false
s.natssConnMux.Unlock()
return
}
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in %s", err, retryInterval.String())
select {
case <-ticker.C:
continue
case <-stopCh:
return
}
}
}

// Connect is called for initial connection as well as after every disconnect
func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) {
for {
select {
case <-s.connect:
s.natssConnMux.Lock()
currentConnProgress := s.natssConnInProgress
s.natssConnMux.Unlock()
if !currentConnProgress {
// Case for lost connectivity, setting InProgress to true to prevent recursion
s.natssConnMux.Lock()
s.natssConnInProgress = true
s.natssConnMux.Unlock()
go s.connectWithRetry(stopCh)
}
case <-stopCh:
return
}
}
}

func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error {
s.subscriptionsMux.Lock()
defer s.subscriptionsMux.Unlock()
Expand Down Expand Up @@ -125,12 +203,12 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.
continue
}
// subscribe
if natssSub, err := s.subscribe(cRef, subRef); err != nil {
natssSub, err := s.subscribe(cRef, subRef)
if err != nil {
return err
} else {
chMap[subRef] = natssSub
activeSubs[subRef] = true
}
chMap[subRef] = natssSub
activeSubs[subRef] = true
}
// Unsubscribe for deleted subscriptions
for sub := range chMap {
Expand Down Expand Up @@ -166,9 +244,21 @@ func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReferenc
// subscribe to a NATSS subject
ch := getSubject(channel)
sub := subscription.String()
natssSub, err := (*s.natssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute))
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn == nil {
return nil, fmt.Errorf("No Connection to NATSS")
}
natssSub, err := (*currentNatssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute))
if err != nil {
s.logger.Error(" Create new NATSS Subscription failed: ", zap.Error(err))
if err.Error() == stan.ErrConnectionClosed.Error() {
s.logger.Error("Connection to NATSS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATS
s.signalReconnect()
return nil, err
}
return nil, err
}
s.logger.Sugar().Infof("NATSS Subscription created: %+v", natssSub)
Expand All @@ -182,7 +272,7 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere
if stanSub, ok := s.subscriptions[channel][subscription]; ok {
// delete from NATSS
if err := (*stanSub).Unsubscribe(); err != nil {
s.logger.Error("Unsubscribing NATS Streaming subscription failed: ", zap.Error(err))
s.logger.Error("Unsubscribing NATSS Streaming subscription failed: ", zap.Error(err))
return err
}
delete(s.subscriptions[channel], subscription)
Expand Down
26 changes: 20 additions & 6 deletions contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/knative/eventing/contrib/natss/pkg/stanutil"
"github.com/knative/eventing/contrib/natss/pkg/controller/clusterchannelprovisioner"
"github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
Expand All @@ -35,7 +36,6 @@ import (
)

const (
clusterID = "knative-nats-streaming"
natssTestURL = "nats://localhost:4222"

ccpName = "natss"
Expand All @@ -46,6 +46,7 @@ const (
)

var (
clusterID = clusterchannelprovisioner.ClusterId
logger *zap.SugaredLogger
core zapcore.Core
observed *observer.ObservedLogs
Expand Down Expand Up @@ -83,20 +84,33 @@ func TestMain(m *testing.M) {
logger.Fatalf("Cannot start NATSS: %v", err)
}
defer stopNatss(stanServer)

// Create and start Dispatcher.
s, err = NewDispatcher(natssTestURL, testLogger)
if err != nil {
logger.Fatalf("Unable to create NATSS dispatcher: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
s.Start(stopCh)
if s.natssConn == nil {
go s.Start(stopCh)

ready := false
ticker := time.NewTicker(time.Second * 5)
expire := time.NewTimer(time.Second * 120)
for !ready {
select {
case <-ticker.C:
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn != nil && (*currentNatssConn).NatsConn().IsConnected() {
ready = true
}
continue
case <-expire.C:
logger.Fatalf("Failed to connect to NATSS!")
}
}()
}

os.Exit(m.Run())
}

Expand Down
12 changes: 1 addition & 11 deletions contrib/natss/pkg/stanutil/stanutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stanutil
import (
"errors"
"fmt"
"time"

stan "github.com/nats-io/go-nats-streaming"
"go.uber.org/zap"
Expand All @@ -28,16 +27,7 @@ import (
// Connect creates a new NATS-Streaming connection
func Connect(clusterId string, clientId string, natsUrl string, logger *zap.SugaredLogger) (*stan.Conn, error) {
logger.Infof("Connect(): clusterId: %v; clientId: %v; natssUrl: %v", clusterId, clientId, natsUrl)
var sc stan.Conn
var err error
for i := 0; i < 60; i++ {
if sc, err = stan.Connect(clusterId, clientId, stan.NatsURL(natsUrl)); err != nil {
logger.Warnf("Connect(): create new connection failed: %v", err)
time.Sleep(1 * time.Second)
} else {
break
}
}
sc, err := stan.Connect(clusterId, clientId, stan.NatsURL(natsUrl))
if err != nil {
logger.Errorf("Connect(): create new connection failed: %v", err)
return nil, err
Expand Down