Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid deadlocks in query and broadcast behaviours #63

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions internal/coord/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,23 @@ type WorkQueueFunc[E BehaviourEvent] func(context.Context, E) bool
// WorkQueueFunc for each work item, passing the original context
// and event.
type WorkQueue[E BehaviourEvent] struct {
pending chan pendingEvent[E]
pending chan CtxEvent[E]
fn WorkQueueFunc[E]
done atomic.Bool
once sync.Once
}

func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] {
w := &WorkQueue[E]{
pending: make(chan pendingEvent[E], 1),
pending: make(chan CtxEvent[E], 1),
fn: fn,
}
return w
}

type pendingEvent[E any] struct {
// CtxEvent holds and event with an associated context which may carry deadlines or
// tracing information pertinent to the event.
type CtxEvent[E any] struct {
Ctx context.Context
Event E
}
Expand Down Expand Up @@ -89,7 +91,7 @@ func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error {
select {
case <-ctx.Done(): // this is the context for the work item
return ctx.Err()
case w.pending <- pendingEvent[E]{
case w.pending <- CtxEvent[E]{
Ctx: ctx,
Event: cmd,
}:
Expand Down Expand Up @@ -146,3 +148,48 @@ func (w *Waiter[E]) Close() {
func (w *Waiter[E]) Chan() <-chan WaiterEvent[E] {
return w.pending
}

// A QueryMonitor receives event notifications on the progress of a query
type QueryMonitor[E TerminalQueryEvent] interface {
// NotifyProgressed returns a channel that can be used to send notification that a
// query has made progress. If the notification cannot be sent then it will be
// queued and retried at a later time. If the query completes before the progress
// notification can be sent the notification will be discarded.
NotifyProgressed() chan<- CtxEvent[*EventQueryProgressed]

// NotifyFinished returns a channel that can be used to send the notification that a
// query has completed. It is up to the implemention to ensure that the channel has enough
// capacity to receive the single notification.
// The sender must close all other QueryNotifier channels before sending on the NotifyFinished channel.
// The sender may attempt to drain any pending notifications before closing the other channels.
// The NotifyFinished channel will be closed once the sender has attempted to send the Finished notification.
NotifyFinished() chan<- CtxEvent[E]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the current usage of the QueryMonitor I think it would be a nicer API if these were just regular methods that accepted CtxEvent[*EventQueryProgressed] and CtxEvent[E] events. I can only see it used down below as:

func (w *queryNotifier[E]) TryNotifyProgressed(ctx context.Context, ev *EventQueryProgressed) bool {
	if w.stopping {
		return false
	}
	ce := CtxEvent[*EventQueryProgressed]{Ctx: ctx, Event: ev}
	select {
	case w.monitor.NotifyProgressed() <- ce:
		return true
	default:
		w.pending = append(w.pending, ce)
		return false
	}
}

func (w *queryNotifier[E]) NotifyFinished(ctx context.Context, ev E) {
	w.stopping = true
	w.DrainPending()
	close(w.monitor.NotifyProgressed())

	select {
	case w.monitor.NotifyFinished() <- CtxEvent[E]{Ctx: ctx, Event: ev}:
	default:
	}
	close(w.monitor.NotifyFinished())
}

This requires users of the types that implement this interface to deal with quite some internal details.

Alternative suggestion:

// A QueryMonitor receives event notifications on the progress of a query
type QueryMonitor[E TerminalQueryEvent] interface {
	NotifyProgressed(e CtxEvent[*EventQueryProgressed]) bool // indicating successful notification
	NotifyFinished(e CtxEvent[E])
}

closing the specific channels could happen inside the type that implements that interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach doesn't give the caller of QueryMonitor, which is a behaviiour, any control over the blocking behaviour. A channel allows the behaviour to detect and avoid blocking. A method call could do anything and moves the slow consumer problem into the monitor implementation.


// QueryMonitorHook wraps a [QueryMonitor] interface and provides hooks
// that are invoked before calls to the QueryMonitor methods are forwarded.
type QueryMonitorHook[E TerminalQueryEvent] struct {
dennis-tra marked this conversation as resolved.
Show resolved Hide resolved
qm QueryMonitor[E]
BeforeProgressed func()
BeforeFinished func()
}

var _ QueryMonitor[*EventQueryFinished] = (*QueryMonitorHook[*EventQueryFinished])(nil)

func NewQueryMonitorHook[E TerminalQueryEvent](qm QueryMonitor[E]) *QueryMonitorHook[E] {
return &QueryMonitorHook[E]{
qm: qm,
BeforeProgressed: func() {},
BeforeFinished: func() {},
}
}

func (n *QueryMonitorHook[E]) NotifyProgressed() chan<- CtxEvent[*EventQueryProgressed] {
n.BeforeProgressed()
return n.qm.NotifyProgressed()
}

func (n *QueryMonitorHook[E]) NotifyFinished() chan<- CtxEvent[E] {
n.BeforeFinished()
return n.qm.NotifyFinished()
}
Loading