Skip to content

Commit

Permalink
Use re-readable reader and common chunker (trufflesecurity#703)
Browse files Browse the repository at this point in the history
* Use re-readable reader and common chunker

* Linter feedback

* Break on error
  • Loading branch information
bill-rich authored Aug 10, 2022
1 parent dcc102a commit a473b9a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 55 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
cloud.google.com/go/secretmanager v1.5.0
github.com/Azure/go-autorest/autorest/azure/auth v0.5.11
github.com/aws/aws-sdk-go v1.44.61
github.com/bill-rich/disk-buffer-reader v0.1.2
github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c
github.com/bitfinexcom/bitfinex-api-go v0.0.0-20210608095005-9e0b26f200fb
github.com/bradleyfalzon/ghinstallation/v2 v2.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.44.61 h1:NcpLSS3Z0MiVQIYugx4I40vSIEEAXT0baO684ExNRco=
github.com/aws/aws-sdk-go v1.44.61/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/bill-rich/disk-buffer-reader v0.1.2 h1:pN9K5JoacTcNWp2SCd3n7mPouSwMP9ouTS66Qa+5IPY=
github.com/bill-rich/disk-buffer-reader v0.1.2/go.mod h1:VVzzsK1Ac2AnpOfp/5r9JlIFaFkZ9uSf7zisZayCt0Y=
github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c h1:tSME5FDS02qQll3JYodI6RZR/g4EKOHApGv1wMZT+Z0=
github.com/bill-rich/go-syslog v0.0.0-20220413021637-49edb52a574c/go.mod h1:+sCc6hztur+oZCLOsNk6wCCy+GLrnSNHSRmTnnL+8iQ=
github.com/bitfinexcom/bitfinex-api-go v0.0.0-20210608095005-9e0b26f200fb h1:9v7Bzlg+1EBYi2IYcUmOwHReBEfqBbYIj3ZCi9cIe1Q=
Expand Down
36 changes: 36 additions & 0 deletions pkg/common/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package common

import (
"bufio"
"errors"
"io"

log "github.com/sirupsen/logrus"
)

const (
ChunkSize = 10 * 1024
PeekSize = 3 * 1024
)

func ChunkReader(r io.Reader) chan []byte {
chunkChan := make(chan []byte)
go func() {
defer close(chunkChan)
reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize)
for {
chunk := make([]byte, ChunkSize)
n, err := reader.Read(chunk)
if err != nil && !errors.Is(err, io.EOF) {
log.WithError(err).Error("Error chunking reader.")
break
}
peekData, _ := reader.Peek(PeekSize)
chunkChan <- append(chunk[:n], peekData...)
if errors.Is(err, io.EOF) {
break
}
}
}()
return chunkChan
}
75 changes: 75 additions & 0 deletions pkg/common/chunker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package common

import (
"bufio"
"bytes"
"errors"
"io"
"net/http"
"testing"

diskbufferreader "github.com/bill-rich/disk-buffer-reader"
)

func TestChunker(t *testing.T) {
resp, err := http.Get("https://raw.githubusercontent.com/bill-rich/bad-secrets/master/FifteenMB.gz")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
reReader, err := diskbufferreader.New(resp.Body)
if err != nil {
t.Fatal(err)
}
defer reReader.Close()

baseChunkCount := 0

// Count chunks from looping using chunk size.
for {
baseChunkCount++
tmpChunk := make([]byte, ChunkSize)
_, err := reReader.Read(tmpChunk)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
t.Error(err)
}
}
_ = reReader.Reset()

// Get the first two chunks for comparing later.
baseChunkOne := make([]byte, ChunkSize)
baseChunkTwo := make([]byte, ChunkSize)

baseReader := bufio.NewReaderSize(reReader, ChunkSize)
_, _ = baseReader.Read(baseChunkOne)
peek, _ := baseReader.Peek(PeekSize)
baseChunkOne = append(baseChunkOne, peek...)
_, _ = baseReader.Read(baseChunkTwo)
peek, _ = baseReader.Peek(PeekSize)
baseChunkTwo = append(baseChunkTwo, peek...)

// Reset the reader to the beginning and use ChunkReader.
_ = reReader.Reset()

testChunkCount := 0
for chunk := range ChunkReader(reReader) {
testChunkCount++
switch testChunkCount {
case 1:
if !bytes.Equal(baseChunkOne, chunk) {
t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkOne))
}
case 2:
if !bytes.Equal(baseChunkTwo, chunk) {
t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkTwo))
}
}
}
if testChunkCount != baseChunkCount {
t.Errorf("Wrong number of chunks received. Got %d, expected: %d.", testChunkCount, baseChunkCount)
}

}
79 changes: 24 additions & 55 deletions pkg/sources/filesystem/filesystem.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package filesystem

import (
"bufio"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"

diskbufferreader "github.com/bill-rich/disk-buffer-reader"
"github.com/go-errors/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -112,6 +112,12 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
}
defer inputFile.Close()

reReader, err := diskbufferreader.New(inputFile)
if err != nil {
log.WithError(err).Error("Could not create re-readable reader.")
}
defer reReader.Close()

chunkSkel := &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
Expand All @@ -125,69 +131,32 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
},
Verify: s.verify,
}
if handlers.HandleFile(inputFile, chunkSkel, chunksChan) {
if handlers.HandleFile(reReader, chunkSkel, chunksChan) {
return nil
}

_, err = inputFile.Seek(0, io.SeekStart)
if err != nil {
if err := reReader.Reset(); err != nil {
return err
}

reader := bufio.NewReaderSize(bufio.NewReader(inputFile), BufferSize)

firstChunk := true
for {
if done {
return nil
}

end := BufferSize
buf := make([]byte, BufferSize)
n, err := reader.Read(buf)

if n < BufferSize {
end = n
}

if end > 0 {
data := buf[0:end]

if firstChunk {
firstChunk = false
if common.SkipFile(path, data) {
return nil
}
}

// We are peeking in case a secret exists in our chunk boundaries,
// but we never care if we've run into a peek error.
peekData, _ := reader.Peek(PeekSize)
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: append(data, peekData...),
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
reReader.Stop()

for chunkData := range common.ChunkReader(inputFile) {
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: chunkData,
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
},
Verify: s.verify,
}
}

// io.EOF can be emmitted when 0<n<buffer size
if err != nil {
if errors.Is(err, io.EOF) {
return nil
} else {
return err
}
},
Verify: s.verify,
}
}
return nil
})

if err != nil && err != io.EOF {
Expand Down

0 comments on commit a473b9a

Please sign in to comment.