This package need to create buffered slice that can be flushed when reach size or duration limit
Example: You have a worker for rabbitmq that receives jobs from queue. You receive them one by one and process it. But sometimes you need to accumulate data from jobs for batch processing in database.
go get -u github.com/rb-pkg/buflice
package main
import (
"log"
"sync"
"time"
"github.com/rb-pkg/buflice"
)
type Book struct {
Author string
}
func flushProcessor(chFlush chan []interface{}, chDone chan struct{}, wait *sync.WaitGroup) {
for {
select {
case data := <-chFlush:
wait.Add(1)
log.Printf("%+v", data)
wait.Done()
case <-chDone:
log.Println("Finished flushProcessor")
return
}
}
}
func main() {
chFlush := make(chan []interface{})
chDone := make(chan struct{})
wait := sync.WaitGroup{}
bfl := buflice.NewBuflice(10, 1000*time.Millisecond, chFlush)
go flushProcessor(chFlush, chDone, &wait)
bfl.Start()
bfl.Add(Book{Author: "Author #1"})
bfl.Add(Book{Author: "Author #2"})
bfl.Add(Book{Author: "Author #3"})
bfl.Add(Book{Author: "Author #4"})
bfl.Add(Book{Author: "Author #5"})
time.Sleep(1111 * time.Millisecond)
bfl.Add(Book{Author: "Author #6"})
bfl.Add(Book{Author: "Author #7"})
bfl.Add(Book{Author: "Author #8"})
bfl.Add(Book{Author: "Author #9"})
bfl.Add(Book{Author: "Author #10"})
err := bfl.Close()
if err != nil {
log.Fatalln(err)
}
wait.Wait()
chDone <- struct{}{}
}
Will print:
2019/09/03 14:56:28 [Record #1 Record #2 Record #3 Record #4 Record #5 Record #6]
2019/09/03 14:56:28 [Record #7 Record #8 Record #9 Record #10]
2019/09/03 14:56:28 Finished flushProcessor
Thanks to:
- Everyone that gave this repo a star ⭐ - you keep me motivated 🙂
- Contributors that submitted useful pull-requests or opened good issues with suggestions or a detailed bug report.