Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema/appdata): async listener mux'ing #20879

Merged
merged 17 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions schema/appdata/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package appdata

import (
"context"
"sync"
)

// AsyncListenerOptions are options for async listeners and listener mux's.
type AsyncListenerOptions struct {
// Context is the context whose Done() channel listeners use will use to listen for completion to close their
// goroutine. If it is nil, then context.Background() will be used and goroutines may be leaked.
Context context.Context

// BufferSize is the buffer size of the channels to use. It defaults to 0.
BufferSize int

// DoneWaitGroup is an optional wait-group that listener goroutines will notify via Add(1) when they are started
// and Done() after they are cancelled and completed.
DoneWaitGroup *sync.WaitGroup
}

// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil
// except for Commit which will return an error or nil once all listeners have processed the commit. The context
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the
// channels used to send events to the listeners.
func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener {
asyncListeners := make([]Listener, len(listeners))
commitChans := make([]chan error, len(listeners))
for i, l := range listeners {
commitChan := make(chan error)
commitChans[i] = commitChan
asyncListeners[i] = AsyncListener(opts, commitChan, l)
}
mux := ListenerMux(asyncListeners...)
muxCommit := mux.Commit
mux.Commit = func(data CommitData) error {
if muxCommit != nil {
err := muxCommit(data)
if err != nil {
return err
}
}

for _, commitChan := range commitChans {
err := <-commitChan
if err != nil {
return err
}
}
return nil
}

return mux
}

// AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine
// that is being used for listening will exit when context.Done() returns and no more events will be received by the listener.
// bufferSize is the size of the buffer for the channel that is used to send events to the listener.
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly
// via its Commit callback.
func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener Listener) Listener {
packetChan := make(chan Packet, opts.BufferSize)
res := Listener{}
ctx := opts.Context
if ctx == nil {
ctx = context.Background()
}
done := ctx.Done()

go func() {
if opts.DoneWaitGroup != nil {
opts.DoneWaitGroup.Add(1)
}

var err error
for {
select {
case packet := <-packetChan:
if err != nil {
// if we have an error, don't process any more packets
// and return the error and finish when it's time to commit
if _, ok := packet.(CommitData); ok {
commitChan <- err
return
}
} else {
// process the packet
err = listener.SendPacket(packet)
// if it's a commit
if _, ok := packet.(CommitData); ok {
commitChan <- err
if err != nil {
return
}
}
}

case <-done:
close(packetChan)
if opts.DoneWaitGroup != nil {
opts.DoneWaitGroup.Done()
}
return
}
}
}()
Dismissed Show dismissed Hide dismissed

if listener.InitializeModuleData != nil {
res.InitializeModuleData = func(data ModuleInitializationData) error {
packetChan <- data
return nil
}
}

if listener.StartBlock != nil {
res.StartBlock = func(data StartBlockData) error {
packetChan <- data
return nil
}
}

if listener.OnTx != nil {
res.OnTx = func(data TxData) error {
packetChan <- data
return nil
}
}

if listener.OnEvent != nil {
res.OnEvent = func(data EventData) error {
packetChan <- data
return nil
}
}

if listener.OnKVPair != nil {
res.OnKVPair = func(data KVPairData) error {
packetChan <- data
return nil
}
}

if listener.OnObjectUpdate != nil {
res.OnObjectUpdate = func(data ObjectUpdateData) error {
packetChan <- data
return nil
}
}

if listener.Commit != nil {
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res.callbacks := make(func(data Packet) error, 0)

How about having a general callback slice instead of all individual callbacks?

Copy link
Member Author

@aaronc aaronc Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how this would work. Could you explain more?

Copy link
Contributor

@cool-develope cool-develope Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we can keep only one callback function within Listener, since all callbacks StartBlock, OnTx ... are in the same format as func(packet Packet) error. And then within the callback implementation, we can deal with different packet type like ModuleInitializationData, StartBlockData, and so on using a switch statement. It will reduce the duplicated imp here, and in mux.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already defined the different packet data structures, so we don't need to distinguish the callback functions.

Copy link
Member Author

@aaronc aaronc Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of distinguishing the different callback functions is to signal what each listener is subscribing to. If listeners don't listen to events at all, then OnEvent should be nil and then the producers won't send any events. If there's only one callback function then a producer has to always send all packets even if the consumers are only interested in certain packet types. In the async case it simplifies what is sent over each packet channel to only the subscription callbacks that are non nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense!


return res
}
148 changes: 148 additions & 0 deletions schema/appdata/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package appdata

import (
"context"
"fmt"
"sync"
"testing"
)

func TestAsyncListenerMux(t *testing.T) {
t.Run("empty", func(t *testing.T) {
listener := AsyncListenerMux(AsyncListenerOptions{}, Listener{}, Listener{})

if listener.InitializeModuleData != nil {
t.Error("expected nil")
}
if listener.StartBlock != nil {
t.Error("expected nil")
}
if listener.OnTx != nil {
t.Error("expected nil")
}
if listener.OnEvent != nil {
t.Error("expected nil")
}
if listener.OnKVPair != nil {
t.Error("expected nil")
}
if listener.OnObjectUpdate != nil {
t.Error("expected nil")
}

// commit is not expected to be nil
})

t.Run("call cancel", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
var calls1, calls2 []string
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux(AsyncListenerOptions{
BufferSize: 16, Context: ctx, DoneWaitGroup: wg,
}, listener1, listener2)

callAllCallbacksOnces(t, res)

expectedCalls := []string{
"InitializeModuleData",
"StartBlock",
"OnTx",
"OnEvent",
"OnKVPair",
"OnObjectUpdate",
"Commit",
}

checkExpectedCallOrder(t, calls1, expectedCalls)
checkExpectedCallOrder(t, calls2, expectedCalls)

// cancel and expect the test to finish - if all goroutines aren't canceled the test will hang
cancel()
wg.Wait()
})

t.Run("error on commit", func(t *testing.T) {
var calls1, calls2 []string
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener1.Commit = func(data CommitData) error {
return fmt.Errorf("error")
}
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux(AsyncListenerOptions{}, listener1, listener2)

err := res.Commit(CommitData{})
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}
})
}

func TestAsyncListener(t *testing.T) {
t.Run("call cancel", func(t *testing.T) {
commitChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg},
commitChan, listener)

callAllCallbacksOnces(t, res)

err := <-commitChan
if err != nil {
t.Fatalf("expected nil, got %v", err)
}

checkExpectedCallOrder(t, calls, []string{
"InitializeModuleData",
"StartBlock",
"OnTx",
"OnEvent",
"OnKVPair",
"OnObjectUpdate",
"Commit",
})

calls = nil

// expect wait group to return after cancel is called
cancel()
wg.Wait()
})

t.Run("error", func(t *testing.T) {
commitChan := make(chan error)
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})

listener.OnKVPair = func(updates KVPairData) error {
return fmt.Errorf("error")
}

res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, commitChan, listener)

callAllCallbacksOnces(t, res)

err := <-commitChan
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}

checkExpectedCallOrder(t, calls, []string{"InitializeModuleData", "StartBlock", "OnTx", "OnEvent"})
})
}
Loading
Loading