Skip to content
124 changes: 59 additions & 65 deletions 11_lesson_live_coding_practise/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,88 @@ package main

import (
"errors"
"fmt"
"sync"
"time"
)

type Batcher struct {
mutex sync.Mutex
messages []string
items []string
size int
timeout time.Duration
mutex sync.Mutex
cond *sync.Cond
counter int
}

size int
action func([]string)
messagesCh chan []string
func NewBatcher(size int, timeout time.Duration) (*Batcher, error) {
if size <= 0 {
return nil, errors.New("invalid argument")
}

closeCh chan struct{}
closeDoneCh chan struct{}
bt := &Batcher{
items: make([]string, 0, size),
size: size,
timeout: timeout,
}

bt.cond = sync.NewCond(&bt.mutex)
go bt.runBatcher()
return bt, nil
}

func NewBatcher(action func([]string), size int) (*Batcher, error) {
if action == nil || size <= 0 {
return nil, errors.New("invalid arguments")
}
func (b *Batcher) runBatcher() {
ticker := time.NewTicker(b.timeout)
defer ticker.Stop()

return &Batcher{
action: action,
size: size,
messagesCh: make(chan []string, 1),
closeCh: make(chan struct{}),
closeDoneCh: make(chan struct{}),
}, nil
for {
b.mutex.Lock()
for len(b.items) < b.size {
b.cond.Wait()
}

batch := b.items[:b.size]
b.items = b.items[b.size:]

<-ticker.C
b.counter++
fmt.Printf("Batch %d: %s\n", b.counter, batch)
b.mutex.Unlock()
}
}

// Append add message to batch
func (b *Batcher) Append(message string) {
func (b *Batcher) append(item string) {
b.mutex.Lock()
defer b.mutex.Unlock()

b.messages = append(b.messages, message)
if b.size == len(b.messages) {
b.messagesCh <- b.messages
b.messages = nil
b.items = append(b.items, item)
if len(b.items) >= b.size {
b.cond.Signal()
}
}

// Run start worker for periodic flushing
func (b *Batcher) Run(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
close(b.closeDoneCh)
}()

for {
select {
case <-b.closeCh:
b.flush()
default:
}

select {
case <-b.closeCh:
b.flush()
case messages := <-b.messagesCh:
b.flushMessages(messages)
ticker.Reset(interval)
case <-ticker.C:
b.flush()
}
}
}()
}

func (b *Batcher) flush() {
b.mutex.Lock()
messages := b.messages
b.messages = nil
b.mutex.Unlock()
defer b.mutex.Unlock()

b.flushMessages(messages)
if len(b.items) > 0 {
batch := b.items
b.items = nil
b.counter++
fmt.Printf("Batch %d: %s\n", b.counter, batch)
}
}

func (b *Batcher) flushMessages(messages []string) {
if len(messages) != 0 {
b.action(messages)
func main() {
batcher, err := NewBatcher(4, 2*time.Second)
if err != nil {
fmt.Printf("Error: %v", err)
}

for i := 1; i <= 10; i++ {
batcher.append(fmt.Sprintf("Item %d", i))
time.Sleep(100 * time.Millisecond)
}
}

// Shutdown wait worker and flush buffer before closing
func (b *Batcher) Shutdown() {
close(b.closeCh)
<-b.closeDoneCh
batcher.flush()
}
Loading