From d393dded76e3988390e8e148649f6b4ff84b4b99 Mon Sep 17 00:00:00 2001 From: Keoni Gandall Date: Tue, 13 Feb 2024 12:56:08 -0800 Subject: [PATCH 1/2] Add workers --- lib/bio/bio.go | 37 +++++++++++++++++++++++++++++++------ lib/bio/example_test.go | 10 ++++++++-- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/lib/bio/bio.go b/lib/bio/bio.go index bbe6369..09eb0ff 100644 --- a/lib/bio/bio.go +++ b/lib/bio/bio.go @@ -302,6 +302,31 @@ func ManyToChannel[Data DataTypes, Header HeaderTypes](ctx context.Context, chan return err } +// WorkerFunc defines the type of function that will be run by each worker. +// It should take a context as its argument for cancellation and coordination. +type WorkerFunc func(ctx context.Context) error + +// RunWorkers starts a specified number of workers, each executing the provided WorkerFunc. +// It uses an errgroup.Group to manage the workers and handle errors. +func RunWorkers[Data DataTypes](ctx context.Context, numWorkers int, output chan<- Data, work WorkerFunc) error { + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + return work(ctx) + }) + } + + // Start a separate goroutine to wait for all workers to finish and close the output channel. + go func() { + _ = g.Wait() + close(output) + }() + + // Return immediately, allowing the caller to continue. Note that error handling from workers is asynchronous. + return nil +} + // FilterData is a generic function that implements a channel filter. Users // give an input and output channel, with a filtering function, and FilterData // filters data from the input into the output. @@ -309,18 +334,18 @@ func FilterData[Data DataTypes](ctx context.Context, input <-chan Data, output c for { select { case <-ctx.Done(): - close(output) return ctx.Err() case data, ok := <-input: if !ok { - // If the input channel is closed, we close the output channel and return - close(output) - return nil // returning nil as the input channel being closed is a normal completion signal + return nil } if filter(data) { - // Only send data through if it passes the filter. - output <- data + select { + case output <- data: + case <-ctx.Done(): + return ctx.Err() + } } } } diff --git a/lib/bio/example_test.go b/lib/bio/example_test.go index ce5955f..3f7e471 100644 --- a/lib/bio/example_test.go +++ b/lib/bio/example_test.go @@ -412,7 +412,9 @@ func ExampleFilterData() { ctx := context.Background() errorGroup, ctx := errgroup.WithContext(ctx) errorGroup.Go(func() error { - return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 }) + return bio.RunWorkers(ctx, 1, outputChan, func(ctx context.Context) error { + return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 }) + }) }) // Send some example Alignments to the input channel @@ -466,7 +468,11 @@ $%&$$$$$#')+)+,<>@B?>==<>>;;<<?@DA@?=>==>??<>??7;<706=>=>CBCCB????@CCBDAGFF // Filter the right barcode fastqs from channel barcode := "barcode07" errorGroup.Go(func() error { - return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode }) + // We're going to start multiple workers within this errorGroup. This + // helps when doing computationally intensive operations on channels. + return bio.RunWorkers(ctx, 2, fastqBarcoded, func(ctx context.Context) error { + return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode }) + }) }) // Now, check the outputs. We should have sorted only for barcode07 From e8c29f40edb9f0ec840c2aafe1bd482036faf23f Mon Sep 17 00:00:00 2001 From: Keoni Gandall Date: Tue, 13 Feb 2024 12:57:56 -0800 Subject: [PATCH 2/2] Add to changelog --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index cd7ed8f..2c24abc 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Added workers to bio as a way to process data [#62](https://github.com/Koeng101/dnadesign/pull/62) - Improved megamash efficiency and added []Match JSON conversion [#61](https://github.com/Koeng101/dnadesign/pull/61) - Added barcoding functionality for sequencing reads [#59](https://github.com/Koeng101/dnadesign/pull/59) - Added the megamash algorithm [#50](https://github.com/Koeng101/dnadesign/pull/50)