Skip to content

Commit

Permalink
introduce send chan and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dogancanbakir committed Sep 28, 2023
1 parent 61d734b commit 5deb38f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 10 deletions.
17 changes: 9 additions & 8 deletions dedupe/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var MaxInMemoryDedupeSize = 100 * 1024 * 1024

type DedupeBackend interface {
// Upsert add/update key to backend/database
Upsert(elem string)
Upsert(elem string) bool
// Execute given callback on each element while iterating
IterCallback(callback func(elem string))
// Cleanup cleans any residuals after deduping
Expand All @@ -16,18 +16,18 @@ type DedupeBackend interface {
// all duplicates if
type Dedupe struct {
receive <-chan string
send chan<- string
backend DedupeBackend
}

// Drains channel and tries to dedupe it
func (d *Dedupe) Drain() {
for {
val, ok := <-d.receive
if !ok {
break
for val := range d.receive {
if unique := d.backend.Upsert(val); unique {
d.send <- val
}
d.backend.Upsert(val)
}
close(d.send)
}

// GetResults iterates over dedupe storage and returns results
Expand All @@ -45,9 +45,10 @@ func (d *Dedupe) GetResults() <-chan string {

// NewDedupe returns a dedupe instance which removes all duplicates
// Note: If byteLen is not correct/specified alterx may consume lot of memory
func NewDedupe(ch <-chan string, byteLen int) *Dedupe {
func NewDedupe(receiveCh <-chan string, sendCh chan<- string, byteLen int) *Dedupe {
d := &Dedupe{
receive: ch,
receive: receiveCh,
send: sendCh,
}
if byteLen <= MaxInMemoryDedupeSize {
d.backend = NewMapBackend()
Expand Down
74 changes: 74 additions & 0 deletions dedupe/dedupe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package dedupe

import (
"testing"
)

func TestDedupe(t *testing.T) {
t.Run("MapBackend", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, 1)

receiveCh <- "test1"
receiveCh <- "test2"
receiveCh <- "test1"
close(receiveCh)

go dedupe.Drain()

results := collectResults(resultCh)

if len(results) != 2 {
t.Fatalf("expected 2 unique items, got %d", len(results))
}
})

t.Run("LevelDBBackend", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, MaxInMemoryDedupeSize+1)

receiveCh <- "testA"
receiveCh <- "testB"
receiveCh <- "testA"
close(receiveCh)

go dedupe.Drain()

results := collectResults(resultCh)

if len(results) != 2 {
t.Fatalf("expected 2 unique items, got %d", len(results))
}
})

t.Run("Drain", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, 1)

go func() {
receiveCh <- "testX"
receiveCh <- "testY"
receiveCh <- "testX"
close(receiveCh)
}()

dedupe.Drain()

results := collectResults(resultCh)

if len(results) != 2 {
t.Fatalf("expected 2 unique items, got %d", len(results))
}
})
}

func collectResults(ch <-chan string) []string {
var results []string
for item := range ch {
results = append(results, item)
}
return results
}
9 changes: 8 additions & 1 deletion dedupe/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ func NewLevelDBBackend() *LevelDBBackend {
return l
}

func (l *LevelDBBackend) Upsert(elem string) {
func (l *LevelDBBackend) Upsert(elem string) bool {
_, exists := l.storage.Get(elem)
if exists {
return false
}

if err := l.storage.Set(elem, nil); err != nil {
gologger.Error().Msgf("dedupe: leveldb: got %v while writing %v", err, elem)
return false
}
return true
}

func (l *LevelDBBackend) IterCallback(callback func(elem string)) {
Expand Down
6 changes: 5 additions & 1 deletion dedupe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ func NewMapBackend() *MapBackend {
return &MapBackend{storage: map[string]struct{}{}}
}

func (m *MapBackend) Upsert(elem string) {
func (m *MapBackend) Upsert(elem string) bool {
if _, exists := m.storage[elem]; exists {
return false
}
m.storage[elem] = struct{}{}
return true
}

func (m *MapBackend) IterCallback(callback func(elem string)) {
Expand Down

0 comments on commit 5deb38f

Please sign in to comment.