diff --git a/dedupe/dedupe.go b/dedupe/dedupe.go index d9420ec..8b8b757 100644 --- a/dedupe/dedupe.go +++ b/dedupe/dedupe.go @@ -5,7 +5,7 @@ var MaxInMemoryDedupeSize = 100 * 1024 * 1024 type DedupeBackend interface { // Upsert add/update key to backend/database - Upsert(elem string) + Upsert(elem string) bool // Execute given callback on each element while iterating IterCallback(callback func(elem string)) // Cleanup cleans any residuals after deduping @@ -16,18 +16,18 @@ type DedupeBackend interface { // all duplicates if type Dedupe struct { receive <-chan string + send chan<- string backend DedupeBackend } // Drains channel and tries to dedupe it func (d *Dedupe) Drain() { - for { - val, ok := <-d.receive - if !ok { - break + for val := range d.receive { + if unique := d.backend.Upsert(val); unique { + d.send <- val } - d.backend.Upsert(val) } + close(d.send) } // GetResults iterates over dedupe storage and returns results @@ -45,9 +45,10 @@ func (d *Dedupe) GetResults() <-chan string { // NewDedupe returns a dedupe instance which removes all duplicates // Note: If byteLen is not correct/specified alterx may consume lot of memory -func NewDedupe(ch <-chan string, byteLen int) *Dedupe { +func NewDedupe(receiveCh <-chan string, sendCh chan<- string, byteLen int) *Dedupe { d := &Dedupe{ - receive: ch, + receive: receiveCh, + send: sendCh, } if byteLen <= MaxInMemoryDedupeSize { d.backend = NewMapBackend() diff --git a/dedupe/dedupe_test.go b/dedupe/dedupe_test.go new file mode 100644 index 0000000..9d747a3 --- /dev/null +++ b/dedupe/dedupe_test.go @@ -0,0 +1,74 @@ +package dedupe + +import ( + "testing" +) + +func TestDedupe(t *testing.T) { + t.Run("MapBackend", func(t *testing.T) { + receiveCh := make(chan string, 10) + resultCh := make(chan string, 10) + dedupe := NewDedupe(receiveCh, resultCh, 1) + + receiveCh <- "test1" + receiveCh <- "test2" + receiveCh <- "test1" + close(receiveCh) + + go dedupe.Drain() + + results := collectResults(resultCh) + + if len(results) != 2 { + t.Fatalf("expected 2 unique items, got %d", len(results)) + } + }) + + t.Run("LevelDBBackend", func(t *testing.T) { + receiveCh := make(chan string, 10) + resultCh := make(chan string, 10) + dedupe := NewDedupe(receiveCh, resultCh, MaxInMemoryDedupeSize+1) + + receiveCh <- "testA" + receiveCh <- "testB" + receiveCh <- "testA" + close(receiveCh) + + go dedupe.Drain() + + results := collectResults(resultCh) + + if len(results) != 2 { + t.Fatalf("expected 2 unique items, got %d", len(results)) + } + }) + + t.Run("Drain", func(t *testing.T) { + receiveCh := make(chan string, 10) + resultCh := make(chan string, 10) + dedupe := NewDedupe(receiveCh, resultCh, 1) + + go func() { + receiveCh <- "testX" + receiveCh <- "testY" + receiveCh <- "testX" + close(receiveCh) + }() + + dedupe.Drain() + + results := collectResults(resultCh) + + if len(results) != 2 { + t.Fatalf("expected 2 unique items, got %d", len(results)) + } + }) +} + +func collectResults(ch <-chan string) []string { + var results []string + for item := range ch { + results = append(results, item) + } + return results +} diff --git a/dedupe/leveldb.go b/dedupe/leveldb.go index 83fd388..cca68f8 100644 --- a/dedupe/leveldb.go +++ b/dedupe/leveldb.go @@ -19,10 +19,17 @@ func NewLevelDBBackend() *LevelDBBackend { return l } -func (l *LevelDBBackend) Upsert(elem string) { +func (l *LevelDBBackend) Upsert(elem string) bool { + _, exists := l.storage.Get(elem) + if exists { + return false + } + if err := l.storage.Set(elem, nil); err != nil { gologger.Error().Msgf("dedupe: leveldb: got %v while writing %v", err, elem) + return false } + return true } func (l *LevelDBBackend) IterCallback(callback func(elem string)) { diff --git a/dedupe/map.go b/dedupe/map.go index 94f8edb..d5a8115 100644 --- a/dedupe/map.go +++ b/dedupe/map.go @@ -10,8 +10,12 @@ func NewMapBackend() *MapBackend { return &MapBackend{storage: map[string]struct{}{}} } -func (m *MapBackend) Upsert(elem string) { +func (m *MapBackend) Upsert(elem string) bool { + if _, exists := m.storage[elem]; exists { + return false + } m.storage[elem] = struct{}{} + return true } func (m *MapBackend) IterCallback(callback func(elem string)) {