Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart dispatcher on connection loss #796

Merged
merged 19 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletions contrib/natss/pkg/dispatcher/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package dispatcher

import (
"encoding/json"
"fmt"
// "reflect"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the commented line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"sync"
"time"

Expand All @@ -30,7 +32,11 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
)

const clientID = "knative-natss-dispatcher"
const (
clientID = "knative-natss-dispatcher"
// maxElements defines a maximum number of outstanding re-connect requests
maxElements = 10
)

// SubscriptionsSupervisor manages the state of NATS Streaming subscriptions
type SubscriptionsSupervisor struct {
Expand All @@ -39,30 +45,36 @@ type SubscriptionsSupervisor struct {
receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher

natssConn *stan.Conn

subscriptionsMux sync.Mutex
subscriptions map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription

connect chan struct{}
natssURL string
// natConnMux is used to protect natssConn and natssConnInProgress during
// the transition from not connected to connected states.
natssConnMux sync.Mutex
natssConn *stan.Conn
natssConnInProgress bool
}

// NewDispatcher returns a new SubscriptionsSupervisor.
func NewDispatcher(natssUrl string, logger *zap.Logger) (*SubscriptionsSupervisor, error) {
d := &SubscriptionsSupervisor{
logger: logger,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
connect: make(chan struct{}, maxElements),
natssURL: natssUrl,
subscriptions: make(map[provisioners.ChannelReference]map[subscriptionReference]*stan.Subscription),
}
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, natssUrl, d.logger.Sugar())
if err != nil {
logger.Error("Connect() failed: ", zap.Error(err))
return nil, err
}
d.natssConn = nConn
d.receiver = provisioners.NewMessageReceiver(createReceiverFunction(d, logger.Sugar()), logger.Sugar())

return d, nil
}

func (s *SubscriptionsSupervisor) signalReconnect() {
s.connect <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this non-blocking:

func (s *SubscriptionsSupervisor) signalReconnect() {
    select {
    case s.connect <- struct{}{}:
      // Sent.
    default:
        // The Channel is already full, so a reconnection attempt will occur.
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in the follow up PR

}

func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogger) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(channel provisioners.ChannelReference, m *provisioners.Message) error {
logger.Infof("Received message from %q channel", channel.String())
Expand All @@ -73,8 +85,20 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
logger.Errorf("Error during marshaling of the message: %v", err)
return err
}
if err := stanutil.Publish(s.natssConn, ch, &message, logger); err != nil {
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn == nil {
return fmt.Errorf("No Connection to NATSS")
}
if err := stanutil.Publish(currentNatssConn, ch, &message, logger); err != nil {
logger.Errorf("Error during publish: %v", err)
if err.Error() == stan.ErrConnectionClosed.Error() {
logger.Error("Connection to NATSS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATSS.
s.signalReconnect()
return err
}
return err
}
logger.Infof("Published [%s] : '%s'", channel.String(), m.Headers)
Expand All @@ -83,10 +107,60 @@ func createReceiverFunction(s *SubscriptionsSupervisor, logger *zap.SugaredLogge
}

func (s *SubscriptionsSupervisor) Start(stopCh <-chan struct{}) error {
// Starting Connect to establish connection with NATS
go s.Connect(stopCh)
// Trigger Connect to establish connection with NATS
s.signalReconnect()
s.receiver.Start(stopCh)
return nil
}

func (s *SubscriptionsSupervisor) connectWithRetry(stopCh <-chan struct{}) {
// re-attempting evey 60 seconds until the connection is established.
ticker := time.NewTicker(60 * time.Second)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change it to 1s, 60s is too big and it's the only place which creates a new connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, but do you think 1 second is not too aggressive?

defer ticker.Stop()
for {
nConn, err := stanutil.Connect(clusterchannelprovisioner.ClusterId, clientID, s.natssURL, s.logger.Sugar())
if err == nil {
// Locking here in order to reduce time in locked state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a trailing period.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s.natssConnMux.Lock()
s.natssConn = nConn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps i better to use:

defer s.subscriptionsMux.Unlock()

right after Lock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s.natssConnInProgress = false
s.natssConnMux.Unlock()
return
}
s.logger.Sugar().Errorf("Connect() failed with error: %+v, retrying in 60 seconds", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the message too: "retrying in 1 second"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

select {
case <-ticker.C:
continue
case <-stopCh:
return
}
}
}

// Connect is called for initial connection as well as after every disconnect
func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) {
for {
select {
case <-s.connect:
s.natssConnMux.Lock()
// currentNatssConn := s.natssConn
Copy link

@ghost ghost Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the commented line:

// currentNatssConn := s.natssConn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

currentConnProgress := s.natssConnInProgress
s.natssConnMux.Unlock()
if !currentConnProgress {
// Case for lost connectivity, setting InProgress to true to prevent recursion
s.natssConnMux.Lock()
s.natssConnInProgress = true
s.natssConnMux.Unlock()
go s.connectWithRetry(stopCh)
}
case <-stopCh:
return
}
}
}

func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error {
s.subscriptionsMux.Lock()
defer s.subscriptionsMux.Unlock()
Expand Down Expand Up @@ -125,12 +199,12 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.
continue
}
// subscribe
if natssSub, err := s.subscribe(cRef, subRef); err != nil {
natssSub, err := s.subscribe(cRef, subRef)
if err != nil {
return err
} else {
chMap[subRef] = natssSub
activeSubs[subRef] = true
}
chMap[subRef] = natssSub
activeSubs[subRef] = true
}
// Unsubscribe for deleted subscriptions
for sub := range chMap {
Expand Down Expand Up @@ -166,9 +240,21 @@ func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReferenc
// subscribe to a NATSS subject
ch := getSubject(channel)
sub := subscription.String()
natssSub, err := (*s.natssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute))
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn == nil {
return nil, fmt.Errorf("No Connection to NATSS")
}
natssSub, err := (*currentNatssConn).Subscribe(ch, mcb, stan.DurableName(sub), stan.SetManualAckMode(), stan.AckWait(1*time.Minute))
if err != nil {
s.logger.Error(" Create new NATSS Subscription failed: ", zap.Error(err))
if err.Error() == stan.ErrConnectionClosed.Error() {
s.logger.Error("Connection to NATSS has been lost, attempting to reconnect.")
// Informing SubscriptionsSupervisor to re-establish connection to NATS
s.signalReconnect()
return nil, err
}
return nil, err
}
s.logger.Sugar().Infof("NATSS Subscription created: %+v", natssSub)
Expand All @@ -182,7 +268,7 @@ func (s *SubscriptionsSupervisor) unsubscribe(channel provisioners.ChannelRefere
if stanSub, ok := s.subscriptions[channel][subscription]; ok {
// delete from NATSS
if err := (*stanSub).Unsubscribe(); err != nil {
s.logger.Error("Unsubscribing NATS Streaming subscription failed: ", zap.Error(err))
s.logger.Error("Unsubscribing NATSS Streaming subscription failed: ", zap.Error(err))
return err
}
delete(s.subscriptions[channel], subscription)
Expand Down
26 changes: 20 additions & 6 deletions contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/knative/eventing/contrib/natss/pkg/stanutil"
"github.com/knative/eventing/contrib/natss/pkg/controller/clusterchannelprovisioner"
"github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
Expand All @@ -35,7 +36,6 @@ import (
)

const (
clusterID = "knative-nats-streaming"
natssTestURL = "nats://localhost:4222"

ccpName = "natss"
Expand All @@ -46,6 +46,7 @@ const (
)

var (
clusterID = clusterchannelprovisioner.ClusterId
logger *zap.SugaredLogger
core zapcore.Core
observed *observer.ObservedLogs
Expand Down Expand Up @@ -83,20 +84,33 @@ func TestMain(m *testing.M) {
logger.Fatalf("Cannot start NATSS: %v", err)
}
defer stopNatss(stanServer)

// Create and start Dispatcher.
s, err = NewDispatcher(natssTestURL, testLogger)
if err != nil {
logger.Fatalf("Unable to create NATSS dispatcher: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
s.Start(stopCh)
if s.natssConn == nil {
go s.Start(stopCh)

ready := false
ticker := time.NewTicker(time.Second * 5)
expire := time.NewTimer(time.Second * 120)
for !ready {
select {
case <-ticker.C:
s.natssConnMux.Lock()
currentNatssConn := s.natssConn
s.natssConnMux.Unlock()
if currentNatssConn != nil && (*currentNatssConn).NatsConn().IsConnected() {
ready = true
}
continue
case <-expire.C:
logger.Fatalf("Failed to connect to NATSS!")
}
}()
}

os.Exit(m.Run())
}

Expand Down
12 changes: 1 addition & 11 deletions contrib/natss/pkg/stanutil/stanutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stanutil
import (
"errors"
"fmt"
"time"

stan "github.com/nats-io/go-nats-streaming"
"go.uber.org/zap"
Expand All @@ -28,16 +27,7 @@ import (
// Connect creates a new NATS-Streaming connection
func Connect(clusterId string, clientId string, natsUrl string, logger *zap.SugaredLogger) (*stan.Conn, error) {
logger.Infof("Connect(): clusterId: %v; clientId: %v; natssUrl: %v", clusterId, clientId, natsUrl)
var sc stan.Conn
var err error
for i := 0; i < 60; i++ {
if sc, err = stan.Connect(clusterId, clientId, stan.NatsURL(natsUrl)); err != nil {
logger.Warnf("Connect(): create new connection failed: %v", err)
time.Sleep(1 * time.Second)
} else {
break
}
}
sc, err := stan.Connect(clusterId, clientId, stan.NatsURL(natsUrl))
if err != nil {
logger.Errorf("Connect(): create new connection failed: %v", err)
return nil, err
Expand Down