Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workers #62

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Added workers to bio as a way to process data [#62](https://github.com/Koeng101/dnadesign/pull/62)
- Improved megamash efficiency and added []Match JSON conversion [#61](https://github.com/Koeng101/dnadesign/pull/61)
- Added barcoding functionality for sequencing reads [#59](https://github.com/Koeng101/dnadesign/pull/59)
- Added the megamash algorithm [#50](https://github.com/Koeng101/dnadesign/pull/50)
Expand Down
37 changes: 31 additions & 6 deletions lib/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,25 +302,50 @@ func ManyToChannel[Data DataTypes, Header HeaderTypes](ctx context.Context, chan
return err
}

// WorkerFunc defines the type of function that will be run by each worker.
// It should take a context as its argument for cancellation and coordination.
type WorkerFunc func(ctx context.Context) error

// RunWorkers starts a specified number of workers, each executing the provided WorkerFunc.
// It uses an errgroup.Group to manage the workers and handle errors.
func RunWorkers[Data DataTypes](ctx context.Context, numWorkers int, output chan<- Data, work WorkerFunc) error {
g, ctx := errgroup.WithContext(ctx)

for i := 0; i < numWorkers; i++ {
g.Go(func() error {
return work(ctx)
})
}

// Start a separate goroutine to wait for all workers to finish and close the output channel.
go func() {
_ = g.Wait()
close(output)
}()

// Return immediately, allowing the caller to continue. Note that error handling from workers is asynchronous.
return nil
}

// FilterData is a generic function that implements a channel filter. Users
// give an input and output channel, with a filtering function, and FilterData
// filters data from the input into the output.
func FilterData[Data DataTypes](ctx context.Context, input <-chan Data, output chan<- Data, filter func(a Data) bool) error {
for {
select {
case <-ctx.Done():
close(output)
return ctx.Err()

case data, ok := <-input:
if !ok {
// If the input channel is closed, we close the output channel and return
close(output)
return nil // returning nil as the input channel being closed is a normal completion signal
return nil
}
if filter(data) {
// Only send data through if it passes the filter.
output <- data
select {
case output <- data:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions lib/bio/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ func ExampleFilterData() {
ctx := context.Background()
errorGroup, ctx := errgroup.WithContext(ctx)
errorGroup.Go(func() error {
return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 })
return bio.RunWorkers(ctx, 1, outputChan, func(ctx context.Context) error {
return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 })
})
})

// Send some example Alignments to the input channel
Expand Down Expand Up @@ -466,7 +468,11 @@ $%&$$$$$#')+)+,<>@B?>==<>>;;<<<B??>?@DA@?=>==>??<>??7;<706=>=>CBCCB????@CCBDAGFF
// Filter the right barcode fastqs from channel
barcode := "barcode07"
errorGroup.Go(func() error {
return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode })
// We're going to start multiple workers within this errorGroup. This
// helps when doing computationally intensive operations on channels.
return bio.RunWorkers(ctx, 2, fastqBarcoded, func(ctx context.Context) error {
return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode })
})
})

// Now, check the outputs. We should have sorted only for barcode07
Expand Down
Loading