Skip to content

Commit

Permalink
ref(boost): deprecate existing worker pool (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed May 27, 2024
1 parent e34b5dc commit c430fa8
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 34 deletions.
File renamed without changes.
6 changes: 3 additions & 3 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type (
finishedStreamR = <-chan *workerFinishedResult
finishedStreamW = chan<- *workerFinishedResult

workerWrapper[I any, O any] struct {
core *worker[I, O]
workerWrapperL[I any, O any] struct {
core *workerL[I, O]
}

workersCollection[I, O any] map[workerID]*workerWrapper[I, O]
workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]
)
6 changes: 3 additions & 3 deletions boost/worker.go → boost/worker-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"
)

type worker[I any, O any] struct {
type workerL[I any, O any] struct {
id workerID
exec ExecutiveFunc[I, O]
jobsChIn JobStreamR[I]
Expand All @@ -17,7 +17,7 @@ type worker[I any, O any] struct {
logger *slog.Logger
}

func (w *worker[I, O]) run(parentContext context.Context,
func (w *workerL[I, O]) run(parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func (w *worker[I, O]) run(parentContext context.Context,
}
}

func (w *worker[I, O]) invoke(parentContext context.Context,
func (w *workerL[I, O]) invoke(parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
job Job[I],
Expand Down
8 changes: 4 additions & 4 deletions boost/worker-pool.go → boost/worker-pool-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// than for passing it to another method). This is an experimental convention that
// is being established for all snivilised projects.
type privateWpInfo[I, O any] struct {
pool workersCollection[I, O]
pool workersCollectionL[I, O]
workersJobsCh chan Job[I]
finishedCh finishedStream
cancelCh CancelStream
Expand Down Expand Up @@ -88,7 +88,7 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O

wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
pool: make(workersCollectionL[I, O], noWorkers),
workersJobsCh: make(JobStream[I], noWorkers),
finishedCh: make(finishedStream, noWorkers),
cancelCh: params.CancelCh,
Expand Down Expand Up @@ -238,8 +238,8 @@ func (p *WorkerPool[I, O]) spawn(
outputsChOut JobOutputStreamW[O],
finishedChOut finishedStreamW,
) {
w := &workerWrapper[I, O]{
core: &worker[I, O]{
w := &workerWrapperL[I, O]{
core: &workerL[I, O]{
id: p.composeID(),
exec: p.exec,
jobsChIn: jobsChIn,
Expand Down
14 changes: 7 additions & 7 deletions boost/worker-pool_test.go → boost/worker-pool-legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ type pipeline[I, O any] struct {
wgan boost.WaitGroupAn
sequence int
outputsDup *boost.Duplex[boost.JobOutput[O]]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, O]
provider helpers.ProviderFuncL[I]
producer *helpers.ProducerL[I, O]
pool *boost.WorkerPool[I, O]
consumer *helpers.Consumer[O]
consumer *helpers.ConsumerL[O]
cancel TerminatorFunc[I, O]
stop TerminatorFunc[I, O]
}
Expand All @@ -133,14 +133,14 @@ func start[I, O any](outputsDupCh *boost.Duplex[boost.JobOutput[O]]) *pipeline[I

func (p *pipeline[I, O]) produce(parentContext context.Context,
interval time.Duration,
provider helpers.ProviderFunc[I],
provider helpers.ProviderFuncL[I],
verbose bool,
) {
p.cancel = func(_ context.Context,
parentCancel context.CancelFunc,
delay time.Duration,
) {
go helpers.CancelProducerAfter[I, O](
go helpers.CancelProducerAfterL[I, O](
delay,
parentCancel,
verbose,
Expand All @@ -158,7 +158,7 @@ func (p *pipeline[I, O]) produce(parentContext context.Context,
)
}

p.producer = helpers.StartProducer[I, O](
p.producer = helpers.StartProducerL[I, O](
parentContext,
p.wgan,
JobChSize,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (p *pipeline[I, O]) process(parentContext context.Context,
func (p *pipeline[I, O]) consume(parentContext context.Context,
interval time.Duration, verbose bool,
) {
p.consumer = helpers.StartConsumer(parentContext,
p.consumer = helpers.StartConsumerL(parentContext,
p.wgan,
p.outputsDup.ReaderCh,
interval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/snivilised/lorax/boost"
)

type Consumer[O any] struct {
type ConsumerL[O any] struct {
quitter boost.AnnotatedWgQuitter
RoutineName boost.GoRoutineName
interval time.Duration
Expand All @@ -17,14 +17,14 @@ type Consumer[O any] struct {
verbose bool
}

func StartConsumer[O any](
func StartConsumerL[O any](
parentContext context.Context,
quitter boost.AnnotatedWgQuitter,
outputsChIn boost.JobOutputStreamR[O],
interval time.Duration,
verbose bool,
) *Consumer[O] {
consumer := &Consumer[O]{
) *ConsumerL[O] {
consumer := &ConsumerL[O]{
quitter: quitter,
RoutineName: boost.GoRoutineName("💠 consumer"),
interval: interval,
Expand All @@ -37,7 +37,7 @@ func StartConsumer[O any](
return consumer
}

func (c *Consumer[O]) run(parentContext context.Context) {
func (c *ConsumerL[O]) run(parentContext context.Context) {
defer func() {
c.quitter.Done(c.RoutineName)
if c.verbose {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
type termination string
type terminationDuplex *boost.Duplex[termination]

type ProviderFunc[I any] func() I
type ProviderFuncL[I any] func() I

type Producer[I, O any] struct {
type ProducerL[I, O any] struct {
quitter boost.AnnotatedWgQuitter
RoutineName boost.GoRoutineName
sequenceNo int
provider ProviderFunc[I]
provider ProviderFuncL[I]
interval time.Duration
terminateDup terminationDuplex
JobsCh boost.JobStream[I]
Expand All @@ -29,19 +29,19 @@ type Producer[I, O any] struct {
// The producer owns the Jobs channel as it knows when to close it. This producer is
// a fake producer and exposes a stop method that the client go routine can call to
// indicate end of the work load.
func StartProducer[I, O any](
func StartProducerL[I, O any](
parentContext context.Context,
quitter boost.AnnotatedWgQuitter,
capacity int,
provider ProviderFunc[I],
provider ProviderFuncL[I],
interval time.Duration,
verbose bool,
) *Producer[I, O] {
) *ProducerL[I, O] {
if interval == 0 {
panic(fmt.Sprintf("Invalid delay requested: '%v'", interval))
}

producer := Producer[I, O]{
producer := ProducerL[I, O]{
quitter: quitter,
RoutineName: boost.GoRoutineName("✨ producer"),
provider: provider,
Expand All @@ -56,7 +56,7 @@ func StartProducer[I, O any](
return &producer
}

func (p *Producer[I, O]) run(parentContext context.Context) {
func (p *ProducerL[I, O]) run(parentContext context.Context) {
defer func() {
close(p.JobsCh)
p.quitter.Done(p.RoutineName)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (p *Producer[I, O]) run(parentContext context.Context) {
}
}

func (p *Producer[I, O]) item(parentContext context.Context) bool {
func (p *ProducerL[I, O]) item(parentContext context.Context) bool {
p.sequenceNo++
p.Count++

Expand Down Expand Up @@ -135,7 +135,7 @@ func (p *Producer[I, O]) item(parentContext context.Context) bool {
return result
}

func (p *Producer[I, O]) Stop() {
func (p *ProducerL[I, O]) Stop() {
if p.verbose {
fmt.Println(">>>> 🧲 producer terminating ...")
}
Expand All @@ -147,7 +147,7 @@ func (p *Producer[I, O]) Stop() {
// StopProducerAfter, run in a new go routine
func StopProducerAfter[I, O any](
parentContext context.Context,
producer *Producer[I, O],
producer *ProducerL[I, O],
delay time.Duration,
verbose bool,
) {
Expand All @@ -167,7 +167,7 @@ func StopProducerAfter[I, O any](
}
}

func CancelProducerAfter[I, O any](
func CancelProducerAfterL[I, O any](
delay time.Duration,
parentCancel context.CancelFunc,
verbose bool,
Expand Down

0 comments on commit c430fa8

Please sign in to comment.