Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename PooledQueryBehaviour to QueryBehaviour #65

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type CoordinatorConfig struct {
Routing RoutingConfig

// Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries.
Query PooledQueryConfig
Query QueryConfig
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand Down Expand Up @@ -138,7 +138,7 @@ func DefaultCoordinatorConfig() *CoordinatorConfig {
TracerProvider: otel.GetTracerProvider(),
}

cfg.Query = *DefaultPooledQueryConfig()
cfg.Query = *DefaultQueryConfig()
cfg.Query.Clock = cfg.Clock
cfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery")
cfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
Expand All @@ -165,7 +165,7 @@ func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *
return nil, fmt.Errorf("init telemetry: %w", err)
}

queryBehaviour, err := NewPooledQueryBehaviour(self, &cfg.Query)
queryBehaviour, err := NewQueryBehaviour(self, &cfg.Query)
if err != nil {
return nil, fmt.Errorf("query behaviour: %w", err)
}
Expand Down
42 changes: 21 additions & 21 deletions internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/plprobelab/zikade/tele"
)

type PooledQueryConfig struct {
type QueryConfig struct {
// Clock is a clock that may replaced by a mock when testing
Clock clock.Clock

Expand All @@ -42,7 +42,7 @@ type PooledQueryConfig struct {
}

// Validate checks the configuration options and returns an error if any have invalid values.
func (cfg *PooledQueryConfig) Validate() error {
func (cfg *QueryConfig) Validate() error {
if cfg.Clock == nil {
return &errs.ConfigurationError{
Component: "PooledQueryConfig",
Expand Down Expand Up @@ -94,8 +94,8 @@ func (cfg *PooledQueryConfig) Validate() error {
return nil
}

func DefaultPooledQueryConfig() *PooledQueryConfig {
return &PooledQueryConfig{
func DefaultQueryConfig() *QueryConfig {
return &QueryConfig{
Clock: clock.New(),
Logger: tele.DefaultLogger("coord"),
Tracer: tele.NoopTracer(),
Expand All @@ -107,10 +107,10 @@ func DefaultPooledQueryConfig() *PooledQueryConfig {
}
}

// PooledQueryBehaviour holds the behaviour and state for managing a pool of queries.
type PooledQueryBehaviour struct {
// QueryBehaviour holds the behaviour and state for managing a pool of queries.
type QueryBehaviour struct {
// cfg is a copy of the optional configuration supplied to the behaviour.
cfg PooledQueryConfig
cfg QueryConfig

// performMu is held while Perform is executing to ensure sequential execution of work.
performMu sync.Mutex
Expand All @@ -137,11 +137,11 @@ type PooledQueryBehaviour struct {
ready chan struct{}
}

// NewPooledQueryBehaviour initialises a new PooledQueryBehaviour, setting up the query
// NewQueryBehaviour initialises a new [QueryBehaviour], setting up the query
// pool and other internal state.
func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQueryBehaviour, error) {
func NewQueryBehaviour(self kadt.PeerID, cfg *QueryConfig) (*QueryBehaviour, error) {
if cfg == nil {
cfg = DefaultPooledQueryConfig()
cfg = DefaultQueryConfig()
} else if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -158,7 +158,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ
return nil, fmt.Errorf("query pool: %w", err)
}

h := &PooledQueryBehaviour{
h := &QueryBehaviour{
cfg: *cfg,
pool: pool,
notifiers: make(map[coordt.QueryID]*queryNotifier[*EventQueryFinished]),
Expand All @@ -170,7 +170,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ
// Notify receives a behaviour event and takes appropriate actions such as starting,
// stopping, or updating queries. It also queues events for later processing and
// triggers the advancement of the query pool if applicable.
func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
func (p *QueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
p.pendingInboundMu.Lock()
defer p.pendingInboundMu.Unlock()

Expand All @@ -187,14 +187,14 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {

// Ready returns a channel that signals when the pooled query behaviour is ready to
// perform work.
func (p *PooledQueryBehaviour) Ready() <-chan struct{} {
func (p *QueryBehaviour) Ready() <-chan struct{} {
return p.ready
}

// Perform executes the next available task from the queue of pending events or advances
// the query pool. Returns an event containing the result of the work performed and a
// true value, or nil and a false value if no event was generated.
func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) {
func (p *QueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) {
p.performMu.Lock()
defer p.performMu.Unlock()

Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, boo
return p.nextPendingOutbound()
}

func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
func (p *QueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
if len(p.pendingOutbound) == 0 {
return nil, false
}
Expand All @@ -239,7 +239,7 @@ func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) {
return ev, true
}

func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) {
func (p *QueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) {
p.pendingInboundMu.Lock()
defer p.pendingInboundMu.Unlock()
if len(p.pendingInbound) == 0 {
Expand All @@ -250,7 +250,7 @@ func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], b
return pev, true
}

func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) {
func (p *QueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) {
ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.perfomNextInbound")
defer span.End()
pev, ok := p.nextPendingInbound()
Expand Down Expand Up @@ -343,7 +343,7 @@ func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (Behaviour
return p.advancePool(pev.Ctx, cmd)
}

func (p *PooledQueryBehaviour) updateReadyStatus() {
func (p *QueryBehaviour) updateReadyStatus() {
if len(p.pendingOutbound) != 0 {
select {
case p.ready <- struct{}{}:
Expand All @@ -368,7 +368,7 @@ func (p *PooledQueryBehaviour) updateReadyStatus() {
// advancePool advances the query pool state machine and returns an outbound event if
// there is work to be performed. Also notifies waiters of query completion or
// progress.
func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) {
func (p *QueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) {
ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
Expand Down Expand Up @@ -416,15 +416,15 @@ func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEve
return nil, false
}

func (p *PooledQueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) {
func (p *QueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) {
for _, info := range nodes {
p.pendingOutbound = append(p.pendingOutbound, &EventAddNode{
NodeID: info,
})
}
}

func (p *PooledQueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) {
func (p *QueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) {
p.pendingOutbound = append(p.pendingOutbound, &EventNotifyNonConnectivity{
NodeID: nid,
})
Expand Down
30 changes: 15 additions & 15 deletions internal/coord/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,34 @@ import (
"github.com/plprobelab/zikade/pb"
)

func TestPooledQueryConfigValidate(t *testing.T) {
func TestQueryConfigValidate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

require.NoError(t, cfg.Validate())
})

t.Run("clock is not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Clock = nil
require.Error(t, cfg.Validate())
})

t.Run("logger not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()
cfg.Logger = nil
require.Error(t, cfg.Validate())
})

t.Run("tracer not nil", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()
cfg.Tracer = nil
require.Error(t, cfg.Validate())
})

t.Run("query concurrency positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Concurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -52,7 +52,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("query timeout positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.Timeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -61,7 +61,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("request concurrency positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.RequestConcurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -70,7 +70,7 @@ func TestPooledQueryConfigValidate(t *testing.T) {
})

t.Run("request timeout positive", func(t *testing.T) {
cfg := DefaultPooledQueryConfig()
cfg := DefaultQueryConfig()

cfg.RequestTimeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -86,7 +86,7 @@ func TestQueryBehaviourBase(t *testing.T) {
type QueryBehaviourBaseTestSuite struct {
suite.Suite

cfg *PooledQueryConfig
cfg *QueryConfig
top *nettest.Topology
nodes []*nettest.Peer
}
Expand All @@ -99,7 +99,7 @@ func (ts *QueryBehaviourBaseTestSuite) SetupTest() {
ts.top = top
ts.nodes = nodes

ts.cfg = DefaultPooledQueryConfig()
ts.cfg = DefaultQueryConfig()
ts.cfg.Clock = clk
}

Expand All @@ -111,7 +111,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesNoProgress() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryProgressed() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() {
rt := ts.nodes[0].RoutingTable
seeds := rt.NearestNodes(target, 5)

b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg)
ts.Require().NoError(err)

waiter := NewQueryWaiter(5)
Expand Down Expand Up @@ -274,7 +274,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() {
kadtest.ReadItem[CtxEvent[*EventQueryFinished]](t, ctx, waiter.Finished())
}

func TestPooledQuery_deadlock_regression(t *testing.T) {
func TestQuery_deadlock_regression(t *testing.T) {
t.Skip()
ctx := kadtest.CtxShort(t)
msg := &pb.Message{}
Expand Down