Skip to content

Commit

Permalink
make variadic
Browse files Browse the repository at this point in the history
  • Loading branch information
dogancanbakir committed Oct 23, 2023
1 parent 5deb38f commit 9bb3978
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
23 changes: 16 additions & 7 deletions dedupe/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@ type DedupeBackend interface {
// all duplicates if
type Dedupe struct {
receive <-chan string
send chan<- string
backend DedupeBackend
}

// Option is a type for variadic options in Drain
type Option func(val string)

// WithUnique is an option to send unique values to the provided channel
func WithUnique(ch chan<- string) Option {
return func(val string) {
ch <- val
}
}

// Drains channel and tries to dedupe it
func (d *Dedupe) Drain() {
func (d *Dedupe) Drain(opts ...Option) {
for val := range d.receive {
if unique := d.backend.Upsert(val); unique {
d.send <- val
for _, opt := range opts {
opt(val)
}
}
}
close(d.send)
}

// GetResults iterates over dedupe storage and returns results
Expand All @@ -45,10 +55,9 @@ func (d *Dedupe) GetResults() <-chan string {

// NewDedupe returns a dedupe instance which removes all duplicates
// Note: If byteLen is not correct/specified alterx may consume lot of memory
func NewDedupe(receiveCh <-chan string, sendCh chan<- string, byteLen int) *Dedupe {
func NewDedupe(ch <-chan string, byteLen int) *Dedupe {
d := &Dedupe{
receive: receiveCh,
send: sendCh,
receive: ch,
}
if byteLen <= MaxInMemoryDedupeSize {
d.backend = NewMapBackend()
Expand Down
31 changes: 16 additions & 15 deletions dedupe/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
func TestDedupe(t *testing.T) {
t.Run("MapBackend", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, 1)
dedupe := NewDedupe(receiveCh, 1)

receiveCh <- "test1"
receiveCh <- "test2"
receiveCh <- "test1"
close(receiveCh)

go dedupe.Drain()
resultCh := make(chan string, 10)
dedupe.Drain(WithUnique(resultCh))
close(resultCh)

results := collectResults(resultCh)

Expand All @@ -26,15 +27,16 @@ func TestDedupe(t *testing.T) {

t.Run("LevelDBBackend", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, MaxInMemoryDedupeSize+1)
dedupe := NewDedupe(receiveCh, MaxInMemoryDedupeSize+1)

receiveCh <- "testA"
receiveCh <- "testB"
receiveCh <- "testA"
close(receiveCh)

go dedupe.Drain()
resultCh := make(chan string, 10)
dedupe.Drain(WithUnique(resultCh))
close(resultCh)

results := collectResults(resultCh)

Expand All @@ -45,17 +47,16 @@ func TestDedupe(t *testing.T) {

t.Run("Drain", func(t *testing.T) {
receiveCh := make(chan string, 10)
resultCh := make(chan string, 10)
dedupe := NewDedupe(receiveCh, resultCh, 1)
dedupe := NewDedupe(receiveCh, 1)

go func() {
receiveCh <- "testX"
receiveCh <- "testY"
receiveCh <- "testX"
close(receiveCh)
}()
receiveCh <- "testX"
receiveCh <- "testY"
receiveCh <- "testX"
close(receiveCh)

dedupe.Drain()
resultCh := make(chan string, 10)
dedupe.Drain(WithUnique(resultCh))
close(resultCh)

results := collectResults(resultCh)

Expand Down

0 comments on commit 9bb3978

Please sign in to comment.