-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a78f808
commit 61d734b
Showing
5 changed files
with
176 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package dedupe | ||
|
||
// MaxInMemoryDedupeSize (default : 100 MB) | ||
var MaxInMemoryDedupeSize = 100 * 1024 * 1024 | ||
|
||
type DedupeBackend interface { | ||
// Upsert add/update key to backend/database | ||
Upsert(elem string) | ||
// Execute given callback on each element while iterating | ||
IterCallback(callback func(elem string)) | ||
// Cleanup cleans any residuals after deduping | ||
Cleanup() | ||
} | ||
|
||
// Dedupe is string deduplication type which removes | ||
// all duplicates if | ||
type Dedupe struct { | ||
receive <-chan string | ||
backend DedupeBackend | ||
} | ||
|
||
// Drains channel and tries to dedupe it | ||
func (d *Dedupe) Drain() { | ||
for { | ||
val, ok := <-d.receive | ||
if !ok { | ||
break | ||
} | ||
d.backend.Upsert(val) | ||
} | ||
} | ||
|
||
// GetResults iterates over dedupe storage and returns results | ||
func (d *Dedupe) GetResults() <-chan string { | ||
send := make(chan string, 100) | ||
go func() { | ||
defer close(send) | ||
d.backend.IterCallback(func(elem string) { | ||
send <- elem | ||
}) | ||
d.backend.Cleanup() | ||
}() | ||
return send | ||
} | ||
|
||
// NewDedupe returns a dedupe instance which removes all duplicates | ||
// Note: If byteLen is not correct/specified alterx may consume lot of memory | ||
func NewDedupe(ch <-chan string, byteLen int) *Dedupe { | ||
d := &Dedupe{ | ||
receive: ch, | ||
} | ||
if byteLen <= MaxInMemoryDedupeSize { | ||
d.backend = NewMapBackend() | ||
} else { | ||
// gologger print a info message here | ||
d.backend = NewLevelDBBackend() | ||
} | ||
return d | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package dedupe | ||
|
||
import ( | ||
"github.com/projectdiscovery/gologger" | ||
"github.com/projectdiscovery/hmap/store/hybrid" | ||
) | ||
|
||
type LevelDBBackend struct { | ||
storage *hybrid.HybridMap | ||
} | ||
|
||
func NewLevelDBBackend() *LevelDBBackend { | ||
l := &LevelDBBackend{} | ||
db, err := hybrid.New(hybrid.DefaultDiskOptions) | ||
if err != nil { | ||
gologger.Fatal().Msgf("failed to create temp dir for alterx dedupe got: %v", err) | ||
} | ||
l.storage = db | ||
return l | ||
} | ||
|
||
func (l *LevelDBBackend) Upsert(elem string) { | ||
if err := l.storage.Set(elem, nil); err != nil { | ||
gologger.Error().Msgf("dedupe: leveldb: got %v while writing %v", err, elem) | ||
} | ||
} | ||
|
||
func (l *LevelDBBackend) IterCallback(callback func(elem string)) { | ||
l.storage.Scan(func(k, _ []byte) error { | ||
callback(string(k)) | ||
return nil | ||
}) | ||
} | ||
|
||
func (l *LevelDBBackend) Cleanup() { | ||
_ = l.storage.Close() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package dedupe | ||
|
||
import "runtime/debug" | ||
|
||
type MapBackend struct { | ||
storage map[string]struct{} | ||
} | ||
|
||
func NewMapBackend() *MapBackend { | ||
return &MapBackend{storage: map[string]struct{}{}} | ||
} | ||
|
||
func (m *MapBackend) Upsert(elem string) { | ||
m.storage[elem] = struct{}{} | ||
} | ||
|
||
func (m *MapBackend) IterCallback(callback func(elem string)) { | ||
for k := range m.storage { | ||
callback(k) | ||
} | ||
} | ||
|
||
func (m *MapBackend) Cleanup() { | ||
m.storage = nil | ||
// By default GC doesnot release buffered/allocated memory | ||
// since there always is possibilitly of needing it again/immediately | ||
// and releases memory in chunks | ||
// debug.FreeOSMemory forces GC to release allocated memory at once | ||
debug.FreeOSMemory() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters