Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiNode Adaptor Managed Subscriptions #960

Open
wants to merge 12 commits into
base: BCFR-1071-Generic-MultiNodeClient
Choose a base branch
from
Open
34 changes: 26 additions & 8 deletions pkg/solana/client/multinode/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int {
return len(m.subs)
}

// RegisterSub adds the sub to the rpcClient list
func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) error {
// registerSub adds the sub to the rpcClient list
func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
// ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to
Expand All @@ -72,11 +72,16 @@ func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightC
return fmt.Errorf("failed to register subscription - all in-flight requests were canceled")
default:
}
// TODO: BCI-3358 - delete sub when caller unsubscribes.
m.subs[sub] = struct{}{}
return nil
}

func (m *MultiNodeClient[RPC, HEAD]) removeSub(sub Subscription) {
m.subsSliceMu.Lock()
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
defer m.subsSliceMu.Unlock()
delete(m.subs, sub)
}

func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
Expand Down Expand Up @@ -133,13 +138,18 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch
return nil, nil, err
}

err := m.RegisterSub(&poller, chStopInFlight)
sub := &ManagedSubscription{
Subscription: &poller,
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
if err != nil {
poller.Unsubscribe()
sub.Unsubscribe()
return nil, nil, err
}

return channel, &poller, nil
return channel, sub, nil
}

func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
Expand All @@ -161,13 +171,18 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte
return nil, nil, err
}

err := m.RegisterSub(&poller, chStopInFlight)
sub := &ManagedSubscription{
Subscription: &poller,
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
}

return channel, &poller, nil
return channel, sub, nil
}

func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
Expand Down Expand Up @@ -244,7 +259,10 @@ func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription)

for sub := range m.subs {
if _, keep := keepSubs[sub]; !keep {
// Release lock to avoid deadlock on unsubscribe
m.subsSliceMu.Unlock()
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
sub.Unsubscribe()
m.subsSliceMu.Lock()
delete(m.subs, sub)
}
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/solana/client/multinode/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) {
t.Fatal("failed to receive finalized head: ", ctx.Err())
}
})

t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) {
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
_, sub1, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
_, sub2, err := c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

sub1.Unsubscribe()
require.Equal(t, 1, c.LenSubs())
sub2.Unsubscribe()
require.Equal(t, 0, c.LenSubs())
})

t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) {
_, _, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
_, _, err = c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())
c.UnsubscribeAllExcept()
require.Equal(t, 0, c.LenSubs())
})
}

type mockSub struct {
Expand All @@ -116,7 +141,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {

t.Run("registerSub", func(t *testing.T) {
sub := newMockSub()
err := c.RegisterSub(sub, make(chan struct{}))
err := c.registerSub(sub, make(chan struct{}))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
c.UnsubscribeAllExcept()
Expand All @@ -126,7 +151,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {
chStopInFlight := make(chan struct{})
close(chStopInFlight)
sub := newMockSub()
err := c.RegisterSub(sub, chStopInFlight)
err := c.registerSub(sub, chStopInFlight)
require.Error(t, err)
require.Equal(t, true, sub.unsubscribed)
})
Expand All @@ -135,9 +160,9 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {
chStopInFlight := make(chan struct{})
sub1 := newMockSub()
sub2 := newMockSub()
err := c.RegisterSub(sub1, chStopInFlight)
err := c.registerSub(sub1, chStopInFlight)
require.NoError(t, err)
err = c.RegisterSub(sub2, chStopInFlight)
err = c.registerSub(sub2, chStopInFlight)
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

Expand Down
13 changes: 13 additions & 0 deletions pkg/solana/client/multinode/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ type Subscription interface {
Err() <-chan error
}

// ManagedSubscription is a Subscription which contains an onUnsubscribe callback
type ManagedSubscription struct {
Subscription
onUnsubscribe func(sub Subscription)
}

func (w *ManagedSubscription) Unsubscribe() {
w.Subscription.Unsubscribe()
if w.onUnsubscribe != nil {
w.onUnsubscribe(w)
}
}

// RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks
type RPCClient[
CHAIN_ID ID,
Expand Down
Loading