diff --git a/internal/coord/coordinator.go b/internal/coord/coordinator.go index 83335fc..760384f 100644 --- a/internal/coord/coordinator.go +++ b/internal/coord/coordinator.go @@ -93,7 +93,7 @@ type CoordinatorConfig struct { Routing RoutingConfig // Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries. - Query PooledQueryConfig + Query QueryConfig } // Validate checks the configuration options and returns an error if any have invalid values. @@ -138,7 +138,7 @@ func DefaultCoordinatorConfig() *CoordinatorConfig { TracerProvider: otel.GetTracerProvider(), } - cfg.Query = *DefaultPooledQueryConfig() + cfg.Query = *DefaultQueryConfig() cfg.Query.Clock = cfg.Clock cfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery") cfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) @@ -165,7 +165,7 @@ func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, * return nil, fmt.Errorf("init telemetry: %w", err) } - queryBehaviour, err := NewPooledQueryBehaviour(self, &cfg.Query) + queryBehaviour, err := NewQueryBehaviour(self, &cfg.Query) if err != nil { return nil, fmt.Errorf("query behaviour: %w", err) } diff --git a/internal/coord/query.go b/internal/coord/query.go index 05f23cb..ed4becc 100644 --- a/internal/coord/query.go +++ b/internal/coord/query.go @@ -18,7 +18,7 @@ import ( "github.com/plprobelab/zikade/tele" ) -type PooledQueryConfig struct { +type QueryConfig struct { // Clock is a clock that may replaced by a mock when testing Clock clock.Clock @@ -42,7 +42,7 @@ type PooledQueryConfig struct { } // Validate checks the configuration options and returns an error if any have invalid values. -func (cfg *PooledQueryConfig) Validate() error { +func (cfg *QueryConfig) Validate() error { if cfg.Clock == nil { return &errs.ConfigurationError{ Component: "PooledQueryConfig", @@ -94,8 +94,8 @@ func (cfg *PooledQueryConfig) Validate() error { return nil } -func DefaultPooledQueryConfig() *PooledQueryConfig { - return &PooledQueryConfig{ +func DefaultQueryConfig() *QueryConfig { + return &QueryConfig{ Clock: clock.New(), Logger: tele.DefaultLogger("coord"), Tracer: tele.NoopTracer(), @@ -107,10 +107,10 @@ func DefaultPooledQueryConfig() *PooledQueryConfig { } } -// PooledQueryBehaviour holds the behaviour and state for managing a pool of queries. -type PooledQueryBehaviour struct { +// QueryBehaviour holds the behaviour and state for managing a pool of queries. +type QueryBehaviour struct { // cfg is a copy of the optional configuration supplied to the behaviour. - cfg PooledQueryConfig + cfg QueryConfig // performMu is held while Perform is executing to ensure sequential execution of work. performMu sync.Mutex @@ -137,11 +137,11 @@ type PooledQueryBehaviour struct { ready chan struct{} } -// NewPooledQueryBehaviour initialises a new PooledQueryBehaviour, setting up the query +// NewQueryBehaviour initialises a new [QueryBehaviour], setting up the query // pool and other internal state. -func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQueryBehaviour, error) { +func NewQueryBehaviour(self kadt.PeerID, cfg *QueryConfig) (*QueryBehaviour, error) { if cfg == nil { - cfg = DefaultPooledQueryConfig() + cfg = DefaultQueryConfig() } else if err := cfg.Validate(); err != nil { return nil, err } @@ -158,7 +158,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ return nil, fmt.Errorf("query pool: %w", err) } - h := &PooledQueryBehaviour{ + h := &QueryBehaviour{ cfg: *cfg, pool: pool, notifiers: make(map[coordt.QueryID]*queryNotifier[*EventQueryFinished]), @@ -170,7 +170,7 @@ func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQ // Notify receives a behaviour event and takes appropriate actions such as starting, // stopping, or updating queries. It also queues events for later processing and // triggers the advancement of the query pool if applicable. -func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { +func (p *QueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { p.pendingInboundMu.Lock() defer p.pendingInboundMu.Unlock() @@ -187,14 +187,14 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { // Ready returns a channel that signals when the pooled query behaviour is ready to // perform work. -func (p *PooledQueryBehaviour) Ready() <-chan struct{} { +func (p *QueryBehaviour) Ready() <-chan struct{} { return p.ready } // Perform executes the next available task from the queue of pending events or advances // the query pool. Returns an event containing the result of the work performed and a // true value, or nil and a false value if no event was generated. -func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { +func (p *QueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { p.performMu.Lock() defer p.performMu.Unlock() @@ -230,7 +230,7 @@ func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, boo return p.nextPendingOutbound() } -func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) { +func (p *QueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) { if len(p.pendingOutbound) == 0 { return nil, false } @@ -239,7 +239,7 @@ func (p *PooledQueryBehaviour) nextPendingOutbound() (BehaviourEvent, bool) { return ev, true } -func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) { +func (p *QueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) { p.pendingInboundMu.Lock() defer p.pendingInboundMu.Unlock() if len(p.pendingInbound) == 0 { @@ -250,7 +250,7 @@ func (p *PooledQueryBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], b return pev, true } -func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) { +func (p *QueryBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) { ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.perfomNextInbound") defer span.End() pev, ok := p.nextPendingInbound() @@ -343,7 +343,7 @@ func (p *PooledQueryBehaviour) perfomNextInbound(ctx context.Context) (Behaviour return p.advancePool(pev.Ctx, cmd) } -func (p *PooledQueryBehaviour) updateReadyStatus() { +func (p *QueryBehaviour) updateReadyStatus() { if len(p.pendingOutbound) != 0 { select { case p.ready <- struct{}{}: @@ -368,7 +368,7 @@ func (p *PooledQueryBehaviour) updateReadyStatus() { // advancePool advances the query pool state machine and returns an outbound event if // there is work to be performed. Also notifies waiters of query completion or // progress. -func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) { +func (p *QueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) { ctx, span := p.cfg.Tracer.Start(ctx, "PooledQueryBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev))) defer func() { span.SetAttributes(tele.AttrOutEvent(out)) @@ -416,7 +416,7 @@ func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEve return nil, false } -func (p *PooledQueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) { +func (p *QueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) { for _, info := range nodes { p.pendingOutbound = append(p.pendingOutbound, &EventAddNode{ NodeID: info, @@ -424,7 +424,7 @@ func (p *PooledQueryBehaviour) queueAddNodeEvents(nodes []kadt.PeerID) { } } -func (p *PooledQueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) { +func (p *QueryBehaviour) queueNonConnectivityEvent(nid kadt.PeerID) { p.pendingOutbound = append(p.pendingOutbound, &EventNotifyNonConnectivity{ NodeID: nid, }) diff --git a/internal/coord/query_test.go b/internal/coord/query_test.go index 40d285b..232f7e3 100644 --- a/internal/coord/query_test.go +++ b/internal/coord/query_test.go @@ -16,34 +16,34 @@ import ( "github.com/plprobelab/zikade/pb" ) -func TestPooledQueryConfigValidate(t *testing.T) { +func TestQueryConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() require.NoError(t, cfg.Validate()) }) t.Run("clock is not nil", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.Clock = nil require.Error(t, cfg.Validate()) }) t.Run("logger not nil", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.Logger = nil require.Error(t, cfg.Validate()) }) t.Run("tracer not nil", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.Tracer = nil require.Error(t, cfg.Validate()) }) t.Run("query concurrency positive", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.Concurrency = 0 require.Error(t, cfg.Validate()) @@ -52,7 +52,7 @@ func TestPooledQueryConfigValidate(t *testing.T) { }) t.Run("query timeout positive", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.Timeout = 0 require.Error(t, cfg.Validate()) @@ -61,7 +61,7 @@ func TestPooledQueryConfigValidate(t *testing.T) { }) t.Run("request concurrency positive", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.RequestConcurrency = 0 require.Error(t, cfg.Validate()) @@ -70,7 +70,7 @@ func TestPooledQueryConfigValidate(t *testing.T) { }) t.Run("request timeout positive", func(t *testing.T) { - cfg := DefaultPooledQueryConfig() + cfg := DefaultQueryConfig() cfg.RequestTimeout = 0 require.Error(t, cfg.Validate()) @@ -86,7 +86,7 @@ func TestQueryBehaviourBase(t *testing.T) { type QueryBehaviourBaseTestSuite struct { suite.Suite - cfg *PooledQueryConfig + cfg *QueryConfig top *nettest.Topology nodes []*nettest.Peer } @@ -99,7 +99,7 @@ func (ts *QueryBehaviourBaseTestSuite) SetupTest() { ts.top = top ts.nodes = nodes - ts.cfg = DefaultPooledQueryConfig() + ts.cfg = DefaultQueryConfig() ts.cfg.Clock = clk } @@ -111,7 +111,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesNoProgress() { rt := ts.nodes[0].RoutingTable seeds := rt.NearestNodes(target, 5) - b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) + b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) ts.Require().NoError(err) waiter := NewQueryWaiter(5) @@ -158,7 +158,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryProgressed() { rt := ts.nodes[0].RoutingTable seeds := rt.NearestNodes(target, 5) - b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) + b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) ts.Require().NoError(err) waiter := NewQueryWaiter(5) @@ -206,7 +206,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() { rt := ts.nodes[0].RoutingTable seeds := rt.NearestNodes(target, 5) - b, err := NewPooledQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) + b, err := NewQueryBehaviour(ts.nodes[0].NodeID, ts.cfg) ts.Require().NoError(err) waiter := NewQueryWaiter(5) @@ -274,7 +274,7 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() { kadtest.ReadItem[CtxEvent[*EventQueryFinished]](t, ctx, waiter.Finished()) } -func TestPooledQuery_deadlock_regression(t *testing.T) { +func TestQuery_deadlock_regression(t *testing.T) { t.Skip() ctx := kadtest.CtxShort(t) msg := &pb.Message{}