Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning nats subscriptions on session destroy #611

Merged
merged 4 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions communication/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Dialog interface {
type Receiver interface {
Receive(consumer MessageConsumer) error
Respond(consumer RequestConsumer) error
Unsubscribe()
}

// Sender represents interface for:
Expand Down
6 changes: 0 additions & 6 deletions communication/nats/dialog/dialog_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ func (waiter *dialogWaiter) ServeDialogs(dialogHandler communication.DialogHandl
}

peerID := identity.FromAddress(request.PeerID)
for _, d := range waiter.dialogs {
if d.PeerID() == peerID {
return &responseOK, nil
}
}

dialog := waiter.newDialogToPeer(peerID, waiter.newCodecForPeer(peerID))
err = dialogHandler.Handle(dialog)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions communication/nats/discovery/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (
"net/url"
"strings"

"github.com/mysteriumnetwork/node/market"

"github.com/mysteriumnetwork/node/communication/nats"
"github.com/mysteriumnetwork/node/identity"
"github.com/mysteriumnetwork/node/market"
nats_lib "github.com/nats-io/go-nats"
)

Expand Down
2 changes: 2 additions & 0 deletions communication/nats/message_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/mysteriumnetwork/node/communication"
"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func TestMessageBytesReceive(t *testing.T) {
receiver := &receiverNATS{
connection: connection,
codec: communication.NewCodecBytes(),
subs: make(map[string]*nats.Subscription),
}

consumer := &bytesMessageConsumer{messageReceived: make(chan interface{})}
Expand Down
2 changes: 2 additions & 0 deletions communication/nats/message_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/mysteriumnetwork/node/communication"
"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ func TestMessageCustomReceive(t *testing.T) {
receiver := &receiverNATS{
connection: connection,
codec: communication.NewCodecJSON(),
subs: make(map[string]*nats.Subscription),
}

consumer := &customMessageConsumer{messageReceived: make(chan interface{})}
Expand Down
40 changes: 32 additions & 8 deletions communication/nats/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nats

import (
"fmt"
"sync"

log "github.com/cihub/seelog"
"github.com/mysteriumnetwork/node/communication"
Expand All @@ -35,17 +36,20 @@ func NewReceiver(connection Connection, codec communication.Codec, topic string)
connection: connection,
codec: codec,
messageTopic: topic + ".",
subs: make(map[string]*nats.Subscription),
}
}

type receiverNATS struct {
connection Connection
codec communication.Codec
messageTopic string

mu sync.Mutex
subs map[string]*nats.Subscription
}

func (receiver *receiverNATS) Receive(consumer communication.MessageConsumer) error {

messageTopic := receiver.messageTopic + string(consumer.GetMessageEndpoint())

messageHandler := func(msg *nats.Msg) {
Expand All @@ -66,15 +70,30 @@ func (receiver *receiverNATS) Receive(consumer communication.MessageConsumer) er
}
}

_, err := receiver.connection.Subscribe(messageTopic, messageHandler)
receiver.mu.Lock()
defer receiver.mu.Unlock()

subscription, err := receiver.connection.Subscribe(messageTopic, messageHandler)
if err != nil {
err = fmt.Errorf("failed subscribe message '%s'. %s", messageTopic, err)
return err
}

receiver.subs[messageTopic] = subscription
return nil
}

func (receiver *receiverNATS) Unsubscribe() {
receiver.mu.Lock()
defer receiver.mu.Unlock()

for topic, s := range receiver.subs {
if err := s.Unsubscribe(); err != nil {
log.Error(receiverLogPrefix, "failed to unsubscribed from topic: ", topic)
}
log.Info(receiverLogPrefix, topic, " unsubscribed")
}
}

func (receiver *receiverNATS) Respond(consumer communication.RequestConsumer) error {
requestTopic := receiver.messageTopic + string(consumer.GetRequestEndpoint())

Expand Down Expand Up @@ -111,17 +130,22 @@ func (receiver *receiverNATS) Respond(consumer communication.RequestConsumer) er
}
}

receiver.mu.Lock()
defer receiver.mu.Unlock()

if subscription, ok := receiver.subs[requestTopic]; ok && subscription.IsValid() {
log.Debug(receiverLogPrefix, fmt.Sprintf("Already subscribed to '%s' topic", requestTopic))
return nil
}

log.Debug(receiverLogPrefix, fmt.Sprintf("Request '%s' topic has been subscribed to", requestTopic))

_, err := receiver.connection.Subscribe(requestTopic, messageHandler)
// TODO: nats.Subscription.Unsubscribe() should be called when topic is no longer needed
// session-create and session-destroy topic should be cleared after session-destroy message is received
// or session timeouts (promise processor detects session timeout)
// scope of "session-create topic deduplication #533"
subscription, err := receiver.connection.Subscribe(requestTopic, messageHandler)
if err != nil {
err = fmt.Errorf("failed subscribe request '%s'. %s", requestTopic, err)
return err
}

receiver.subs[requestTopic] = subscription
return nil
}
2 changes: 2 additions & 0 deletions communication/nats/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/mysteriumnetwork/node/communication"
"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand All @@ -36,6 +37,7 @@ func TestReceiverNew(t *testing.T) {
connection: connection,
codec: codec,
messageTopic: "custom.",
subs: make(map[string]*nats.Subscription),
},
NewReceiver(connection, codec, "custom"),
)
Expand Down
2 changes: 2 additions & 0 deletions communication/nats/request_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/mysteriumnetwork/node/communication"
"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -86,6 +87,7 @@ func TestBytesRespond(t *testing.T) {
receiver := &receiverNATS{
connection: connection,
codec: communication.NewCodecBytes(),
subs: make(map[string]*nats.Subscription),
}

consumer := &bytesRequestConsumer{}
Expand Down
2 changes: 2 additions & 0 deletions communication/nats/request_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/mysteriumnetwork/node/communication"
"github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ func TestCustomRespond(t *testing.T) {
receiver := &receiverNATS{
connection: connection,
codec: communication.NewCodecJSON(),
subs: make(map[string]*nats.Subscription),
}

consumer := &customRequestConsumer{}
Expand Down
1 change: 1 addition & 0 deletions core/connection/stubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (fd *fakeDialog) Receive(consumer communication.MessageConsumer) error {
func (fd *fakeDialog) Respond(consumer communication.RequestConsumer) error {
return nil
}
func (fd *fakeDialog) Unsubscribe() {}

func (fd *fakeDialog) Send(producer communication.MessageProducer) error {
return nil
Expand Down
3 changes: 1 addition & 2 deletions core/ip/rest_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"net"
"net/http"
"time"

log "github.com/cihub/seelog"
Expand Down
2 changes: 1 addition & 1 deletion core/promise/methods/noop/dialog_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package noop
import (
"errors"
"sync"

"time"

"github.com/mysteriumnetwork/node/communication"
Expand Down Expand Up @@ -55,6 +54,7 @@ func (fd *fakeDialog) Receive(consumer communication.MessageConsumer) error {
func (fd *fakeDialog) Respond(consumer communication.RequestConsumer) error {
return nil
}
func (fd *fakeDialog) Unsubscribe() {}

func (fd *fakeDialog) Send(producer communication.MessageProducer) error {
fd.sendMutex.Lock()
Expand Down
6 changes: 3 additions & 3 deletions session/create_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ var (
func TestConsumer_Success(t *testing.T) {
mockManager := &managerFake{
returnSession: Session{
"new-id",
fakeSessionConfig{"string-param", 123},
identity.FromAddress("123"),
ID: "new-id",
Config: fakeSessionConfig{"string-param", 123},
ConsumerID: identity.FromAddress("123"),
},
}
consumer := createConsumer{
Expand Down
6 changes: 3 additions & 3 deletions session/destroy_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type managerDestroyFake struct {
func TestDestroyConsumer_Success(t *testing.T) {
mockDestroyer := &managerDestroyFake{
returnSession: Session{
"some-session-id",
fakeSessionConfig{"string-param", 123},
identity.FromAddress("123"),
ID: "some-session-id",
Config: fakeSessionConfig{"string-param", 123},
ConsumerID: identity.FromAddress("123"),
},
}
consumer := destroyConsumer{
Expand Down
18 changes: 16 additions & 2 deletions session/dialog_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package session

import (
"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/identity"
)

// ManagerFactory initiates session Manager instance during runtime
Expand Down Expand Up @@ -57,8 +58,21 @@ func (handler *handler) subscribeSessionRequests(dialog communication.Dialog) er

return dialog.Respond(
&destroyConsumer{
SessionDestroyer: handler.sessionManagerFactory(dialog),
PeerID: dialog.PeerID(),
SessionDestroyer: &sessionDestroyer{
destroyer: handler.sessionManagerFactory(dialog),
unsubscribe: dialog.Unsubscribe,
},
PeerID: dialog.PeerID(),
},
)
}

type sessionDestroyer struct {
destroyer Destroyer
unsubscribe func()
}

func (sd *sessionDestroyer) Destroy(consumerID identity.Identity, sessionID string) error {
sd.unsubscribe()
return sd.destroyer.Destroy(consumerID, sessionID)
}