Skip to content

Commit

Permalink
taskgroup: add sketch of Runner type
Browse files Browse the repository at this point in the history
This is basically a Collector that can be bolted to a Group
or to a start function extracted from a Group using a Throttle.
  • Loading branch information
creachadair committed Oct 6, 2024
1 parent f9ec917 commit 82ce343
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,69 @@ func (c *Collector[T]) Report(f func(report func(T)) error) Task {
func (c *Collector[T]) Run(f func() T) Task {
return NoError(func() { c.report(f()) })
}

// A Runner manages a group of [Task] functions that report values.
// At least the Go field must be populated. A Runner must not be copied, nor
// its exported fields modified, after its first use.
type Runner[T any] struct {
run func(Task) // start the task in a goroutine

μ sync.Mutex
collect func(T) // handle values reported by tasks
}

// NewRunner creates a new empty runner that uses run to execute tasks
// returning values of type T. Values reported by successful tasks are passed
// to collect. The initial value of collect may be nil.
// If run == nil, NewRunner will panic.
func NewRunner[T any](run func(Task), collect func(T)) *Runner[T] {
if run == nil {
panic("run function is nil")
}
return &Runner[T]{run: run, collect: collect}
}

// Collect sets the value collector for r. If report == nil, values reported by
// tasks are discarded. Otherwise, each value reported by a successful task
// (after error filtering) is passed to report.
//
// Calls to report are synchronized so that it is safe for report to manipulate
// local data structures without additional locking. It is safe to call Collect
// while tasks are active in r.
func (r *Runner[T]) Collect(report func(T)) *Runner[T] {
r.μ.Lock()
defer r.μ.Unlock()
r.collect = report
return r
}

// report delivers v to the Collect callback, with the lock held, or if no
// callback is set it discards v.
func (r *Runner[T]) report(v T) {
r.μ.Lock()
defer r.μ.Unlock()
if r.collect != nil {
r.collect(v)
}
}

// Call runs f in r.
func (r *Runner[T]) Call(f func() (T, error)) {
r.run(func() error {
v, err := f()
if err == nil {
r.report(v)
}
return err
})
}

// Run runs f in r.
func (r *Runner[T]) Run(f func() T) {
r.run(func() error { r.report(f()); return nil })
}

// Report runs f in r.
func (r *Runner[T]) Report(f func(report func(T)) error) {
r.run(func() error { return f(r.report) })
}

0 comments on commit 82ce343

Please sign in to comment.