Skip to content

Commit

Permalink
fix: websocket client duplicated messages (evmos#955)
Browse files Browse the repository at this point in the history
* Problem: websocket client get duplicated messages

Closes: evmos#954
Solution:
- localize the subscription management within current connection

* changelog

* fix linter

* fix test building

Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
  • Loading branch information
yihuang and fedekunze committed Mar 1, 2022
1 parent aef3a14 commit 4ec8b05
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 265 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [tharsis#878](https://github.com/tharsis/ethermint/pull/878) Workaround to make GetBlock RPC api report correct block gas used.
* (rpc) [tharsis#900](https://github.com/tharsis/ethermint/pull/900) newPendingTransactions filter return ethereum tx hash.
* (rpc) [tharsis#933](https://github.com/tharsis/ethermint/pull/933) Fix newPendingTransactions subscription deadlock when a Websocket client exits without unsubscribing and the node errors.
* (rpc) [#955](https://github.com/tharsis/ethermint/pull/955) Fix websocket server push duplicated messages to subscriber.

## [v0.9.0] - 2021-12-01

Expand Down
34 changes: 17 additions & 17 deletions rpc/ethereum/namespaces/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,62 +90,62 @@ func (es *EventSystem) WithContext(ctx context.Context) {

// subscribe performs a new event subscription to a given Tendermint event.
// The subscription creates a unidirectional receive event channel to receive the ResultEvent.
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) {
var (
err error
cancelFn context.CancelFunc
)

es.ctx, cancelFn = context.WithCancel(context.Background())
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

existingSubs := es.eventBus.Topics()
for _, topic := range existingSubs {
if topic == sub.event {
eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event)
return nil, cancelFn, err
return nil, nil, err
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}
}

switch sub.typ {
case filters.LogsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.BlocksSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.PendingTransactionsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
}

if err != nil {
sub.err <- err
return nil, cancelFn, err
return nil, nil, err
}

// wrap events in a go routine to prevent blocking
es.install <- sub
<-sub.installed

eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
return sub, cancelFn, err
return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}

// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
Expand Down Expand Up @@ -173,7 +173,7 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription

// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.LogsSubscription,
Expand All @@ -188,7 +188,7 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription
}

// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
Expand All @@ -202,7 +202,7 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, er
}

// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
Expand Down
39 changes: 29 additions & 10 deletions rpc/ethereum/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,43 @@ package pubsub

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"

coretypes "github.com/tendermint/tendermint/rpc/core/types"
)

type UnsubscribeFunc func()

type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, error)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}

type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string][]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}

func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string][]chan<- coretypes.ResultEvent),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}

func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}

func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()
Expand Down Expand Up @@ -67,21 +75,32 @@ func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Unlock()
}

func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, error) {
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if !ok {
return nil, errors.Errorf("topic not found: %s", name)
return nil, nil, errors.Errorf("topic not found: %s", name)
}

ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
m.subscribers[name] = append(m.subscribers[name], ch)

return ch, nil
id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch

unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}

return ch, unsubscribe, nil
}

func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
Expand Down
6 changes: 3 additions & 3 deletions rpc/ethereum/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func TestSubscribe(t *testing.T) {

q.AddTopic("lol", lolSrc)

kekSubC, err := q.Subscribe("kek")
kekSubC, _, err := q.Subscribe("kek")
require.NoError(t, err)

lolSubC, err := q.Subscribe("lol")
lolSubC, _, err := q.Subscribe("lol")
require.NoError(t, err)

lol2SubC, err := q.Subscribe("lol")
lol2SubC, _, err := q.Subscribe("lol")
require.NoError(t, err)

wg := new(sync.WaitGroup)
Expand Down
Loading

0 comments on commit 4ec8b05

Please sign in to comment.