diff --git a/go.mod b/go.mod index aac956286152..fa5aff4bee6c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( cloud.google.com/go/secretmanager v1.5.0 github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 github.com/aws/aws-sdk-go v1.44.61 + github.com/bill-rich/disk-buffer-reader v0.1.2 github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c github.com/bitfinexcom/bitfinex-api-go v0.0.0-20210608095005-9e0b26f200fb github.com/bradleyfalzon/ghinstallation/v2 v2.1.0 diff --git a/go.sum b/go.sum index 4a83526599a3..3cb81a077867 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aws/aws-sdk-go v1.44.61 h1:NcpLSS3Z0MiVQIYugx4I40vSIEEAXT0baO684ExNRco= github.com/aws/aws-sdk-go v1.44.61/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/bill-rich/disk-buffer-reader v0.1.2 h1:pN9K5JoacTcNWp2SCd3n7mPouSwMP9ouTS66Qa+5IPY= +github.com/bill-rich/disk-buffer-reader v0.1.2/go.mod h1:VVzzsK1Ac2AnpOfp/5r9JlIFaFkZ9uSf7zisZayCt0Y= github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c h1:tSME5FDS02qQll3JYodI6RZR/g4EKOHApGv1wMZT+Z0= github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c/go.mod h1:+sCc6hztur+oZCLOsNk6wCCy+GLrnSNHSRmTnnL+8iQ= github.com/bitfinexcom/bitfinex-api-go v0.0.0-20210608095005-9e0b26f200fb h1:9v7Bzlg+1EBYi2IYcUmOwHReBEfqBbYIj3ZCi9cIe1Q= diff --git a/pkg/common/chunker.go b/pkg/common/chunker.go new file mode 100644 index 000000000000..33fc8f0fdd55 --- /dev/null +++ b/pkg/common/chunker.go @@ -0,0 +1,36 @@ +package common + +import ( + "bufio" + "errors" + "io" + + log "github.com/sirupsen/logrus" +) + +const ( + ChunkSize = 10 * 1024 + PeekSize = 3 * 1024 +) + +func ChunkReader(r io.Reader) chan []byte { + chunkChan := make(chan []byte) + go func() { + defer close(chunkChan) + reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize) + for { + chunk := make([]byte, ChunkSize) + n, err := reader.Read(chunk) + if err != nil && !errors.Is(err, io.EOF) { + log.WithError(err).Error("Error chunking reader.") + break + } + peekData, _ := reader.Peek(PeekSize) + chunkChan <- append(chunk[:n], peekData...) + if errors.Is(err, io.EOF) { + break + } + } + }() + return chunkChan +} diff --git a/pkg/common/chunker_test.go b/pkg/common/chunker_test.go new file mode 100644 index 000000000000..01949f325897 --- /dev/null +++ b/pkg/common/chunker_test.go @@ -0,0 +1,75 @@ +package common + +import ( + "bufio" + "bytes" + "errors" + "io" + "net/http" + "testing" + + diskbufferreader "github.com/bill-rich/disk-buffer-reader" +) + +func TestChunker(t *testing.T) { + resp, err := http.Get("https://raw.githubusercontent.com/bill-rich/bad-secrets/master/FifteenMB.gz") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + reReader, err := diskbufferreader.New(resp.Body) + if err != nil { + t.Fatal(err) + } + defer reReader.Close() + + baseChunkCount := 0 + + // Count chunks from looping using chunk size. + for { + baseChunkCount++ + tmpChunk := make([]byte, ChunkSize) + _, err := reReader.Read(tmpChunk) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + t.Error(err) + } + } + _ = reReader.Reset() + + // Get the first two chunks for comparing later. + baseChunkOne := make([]byte, ChunkSize) + baseChunkTwo := make([]byte, ChunkSize) + + baseReader := bufio.NewReaderSize(reReader, ChunkSize) + _, _ = baseReader.Read(baseChunkOne) + peek, _ := baseReader.Peek(PeekSize) + baseChunkOne = append(baseChunkOne, peek...) + _, _ = baseReader.Read(baseChunkTwo) + peek, _ = baseReader.Peek(PeekSize) + baseChunkTwo = append(baseChunkTwo, peek...) + + // Reset the reader to the beginning and use ChunkReader. + _ = reReader.Reset() + + testChunkCount := 0 + for chunk := range ChunkReader(reReader) { + testChunkCount++ + switch testChunkCount { + case 1: + if !bytes.Equal(baseChunkOne, chunk) { + t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkOne)) + } + case 2: + if !bytes.Equal(baseChunkTwo, chunk) { + t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkTwo)) + } + } + } + if testChunkCount != baseChunkCount { + t.Errorf("Wrong number of chunks received. Got %d, expected: %d.", testChunkCount, baseChunkCount) + } + +} diff --git a/pkg/sources/filesystem/filesystem.go b/pkg/sources/filesystem/filesystem.go index ce92660f1204..22fbc81e3b9d 100644 --- a/pkg/sources/filesystem/filesystem.go +++ b/pkg/sources/filesystem/filesystem.go @@ -1,7 +1,6 @@ package filesystem import ( - "bufio" "context" "fmt" "io" @@ -9,6 +8,7 @@ import ( "os" "path/filepath" + diskbufferreader "github.com/bill-rich/disk-buffer-reader" "github.com/go-errors/errors" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" @@ -112,6 +112,12 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err } defer inputFile.Close() + reReader, err := diskbufferreader.New(inputFile) + if err != nil { + log.WithError(err).Error("Could not create re-readable reader.") + } + defer reReader.Close() + chunkSkel := &sources.Chunk{ SourceType: s.Type(), SourceName: s.name, @@ -125,69 +131,32 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err }, Verify: s.verify, } - if handlers.HandleFile(inputFile, chunkSkel, chunksChan) { + if handlers.HandleFile(reReader, chunkSkel, chunksChan) { return nil } - _, err = inputFile.Seek(0, io.SeekStart) - if err != nil { + if err := reReader.Reset(); err != nil { return err } - - reader := bufio.NewReaderSize(bufio.NewReader(inputFile), BufferSize) - - firstChunk := true - for { - if done { - return nil - } - - end := BufferSize - buf := make([]byte, BufferSize) - n, err := reader.Read(buf) - - if n < BufferSize { - end = n - } - - if end > 0 { - data := buf[0:end] - - if firstChunk { - firstChunk = false - if common.SkipFile(path, data) { - return nil - } - } - - // We are peeking in case a secret exists in our chunk boundaries, - // but we never care if we've run into a peek error. - peekData, _ := reader.Peek(PeekSize) - chunksChan <- &sources.Chunk{ - SourceType: s.Type(), - SourceName: s.name, - SourceID: s.SourceID(), - Data: append(data, peekData...), - SourceMetadata: &source_metadatapb.MetaData{ - Data: &source_metadatapb.MetaData_Filesystem{ - Filesystem: &source_metadatapb.Filesystem{ - File: sanitizer.UTF8(path), - }, + reReader.Stop() + + for chunkData := range common.ChunkReader(inputFile) { + chunksChan <- &sources.Chunk{ + SourceType: s.Type(), + SourceName: s.name, + SourceID: s.SourceID(), + Data: chunkData, + SourceMetadata: &source_metadatapb.MetaData{ + Data: &source_metadatapb.MetaData_Filesystem{ + Filesystem: &source_metadatapb.Filesystem{ + File: sanitizer.UTF8(path), }, }, - Verify: s.verify, - } - } - - // io.EOF can be emmitted when 0