Skip to content

Commit

Permalink
Merge pull request #28 from wolfeidau/feat_readdirfs_support
Browse files Browse the repository at this point in the history
feat(dirfs): added support for fs.ReadDirFS
  • Loading branch information
wolfeidau authored Jan 18, 2024
2 parents 57b7228 + 192fcfd commit 10f658a
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 87 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `s3File` implements the following interfaces:

- `fs.FileInfo`
- `fs.DirEntry`
- `fs.ReadDirFile`
- `io.ReaderAt`
- `io.Seeker`

Expand All @@ -24,7 +25,8 @@ In addition to this the `S3FS` also implements the following interfaces:
- `RemoveFS`, which provides a `Remove(name string) error` method.
- `WriteFileFS` which provides a `WriteFile(name string, data []byte, perm fs.FileMode) error` method.

This enables libraries such as [apache arrow](https://arrow.apache.org/) to read parts of a parquet file from S3, without downloading the entire file.
The `Seek` and `ReadAt` operations enable libraries such as [apache arrow](https://arrow.apache.org/) to read parts of a parquet file from S3, without downloading the entire file.

# Usage

```go
Expand Down
121 changes: 80 additions & 41 deletions integration/s3fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ func generateData(length int) []byte {
return bytes.Repeat([]byte("a"), length)
}

func TestList(t *testing.T) {
assert := require.New(t)

func writeTestFile(path string, body []byte) error {
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_list/test.txt"),
Body: bytes.NewReader(oneKilobyte),
Key: aws.String(path),
Body: bytes.NewReader(body),
})
return err
}

func TestList(t *testing.T) {
assert := require.New(t)

err := writeTestFile("test_list/test.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand Down Expand Up @@ -61,11 +66,7 @@ func TestOpen(t *testing.T) {
func TestStat(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_stat.txt"),
Body: bytes.NewReader(oneKilobyte),
})
err := writeTestFile("test_stat.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand All @@ -84,11 +85,7 @@ func TestStat(t *testing.T) {
func TestSeek(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_seek.txt"),
Body: bytes.NewReader(oneKilobyte),
})
err := writeTestFile("test_seek.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand Down Expand Up @@ -149,11 +146,7 @@ func TestSeek(t *testing.T) {
func TestReaderAt(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_reader_at.txt"),
Body: bytes.NewReader(oneKilobyte),
})
err := writeTestFile("test_reader_at.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand All @@ -179,11 +172,7 @@ func TestReaderAt(t *testing.T) {
func TestReaderAtBig(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_reader_at_big.txt"),
Body: bytes.NewReader(generateData(threeMegabytes)),
})
err := writeTestFile("test_reader_at_big.txt", generateData(threeMegabytes))
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand All @@ -208,28 +197,20 @@ func TestReaderAtBig(t *testing.T) {
func TestReadFile(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_read_big.txt"),
Body: bytes.NewReader(generateData(threeMegabytes)),
})
err := writeTestFile("test_read_file.txt", generateData(threeMegabytes))
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)

data, err := fs.ReadFile(s3fs, "test_read_big.txt")
data, err := fs.ReadFile(s3fs, "test_read_file.txt")
assert.NoError(err)
assert.Len(data, threeMegabytes)
}

func TestReadBigEOF(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_read_big_eof.txt"),
Body: bytes.NewReader(generateData(oneMegabyte)),
})
err := writeTestFile("test_read_big_eof.txt", generateData(oneMegabyte))
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand All @@ -249,11 +230,7 @@ func TestRemove(t *testing.T) {
t.Run("create and remove", func(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucketName),
Key: aws.String("test_remove.txt"),
Body: bytes.NewReader(generateData(oneMegabyte)),
})
err := writeTestFile("test_remove.txt", generateData(oneMegabyte))
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)
Expand Down Expand Up @@ -314,3 +291,65 @@ func TestWriteFile(t *testing.T) {
assert.Error(err)
})
}

func TestReadDir(t *testing.T) {
assert := require.New(t)

err := writeTestFile("test_read_dir/one/day/test_read_dir.txt", oneKilobyte)
assert.NoError(err)

err = writeTestFile("test_read_dir/one/week/test_read_dir.txt", oneKilobyte)
assert.NoError(err)

err = writeTestFile("test_read_dir/two/day/test_read_dir.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)

entries, err := s3fs.ReadDir("test_read_dir")
assert.NoError(err)
assert.Len(entries, 2)
assert.ElementsMatch([]string{"one", "two"}, getNames(entries))
assert.True(entries[0].IsDir())
}

func TestFileReadDir(t *testing.T) {
assert := require.New(t)

err := writeTestFile("test_file_read_dir/one/test_file_read_dir_1.txt", oneKilobyte)
assert.NoError(err)

err = writeTestFile("test_file_read_dir/one/test_file_read_dir_2.txt", oneKilobyte)
assert.NoError(err)

s3fs := s3iofs.NewWithClient(testBucketName, client)

entries, err := s3fs.ReadDir("test_file_read_dir")
assert.NoError(err)

dirLs, ok := entries[0].(fs.ReadDirFile)
assert.True(ok)
assert.Equal("one", entries[0].Name())
assert.True(entries[0].IsDir())

entries, err = dirLs.ReadDir(1)
assert.NoError(err)
assert.Len(entries, 1)
assert.Equal("test_file_read_dir_1.txt", entries[0].Name())

entries, err = dirLs.ReadDir(1)
assert.NoError(err)
assert.Len(entries, 1)
assert.Equal("test_file_read_dir_2.txt", entries[0].Name())

_, err = dirLs.ReadDir(1)
assert.Equal(io.EOF, err)
}

func getNames(entries []fs.DirEntry) []string {
names := make([]string, len(entries))
for i, entry := range entries {
names[i] = entry.Name()
}
return names
}
3 changes: 2 additions & 1 deletion integration/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ func TestMain(m *testing.M) {
var logmode aws.ClientLogMode

if os.Getenv("AWS_DEBUG") != "" {
logmode = aws.LogRetries | aws.LogRequest
logmode = aws.LogRetries | aws.LogRequest | aws.LogResponse
}

client = s3.New(s3.Options{
ClientLogMode: logmode,
UsePathStyle: true,
Logger: logging.NewStandardLogger(os.Stdout),
EndpointResolverV2: &Resolver{URL: endpointURL},
Credentials: aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
Expand Down
77 changes: 64 additions & 13 deletions s3file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
)

var (
_ fs.FileInfo = (*s3File)(nil)
_ fs.DirEntry = (*s3File)(nil)
_ io.ReaderAt = (*s3File)(nil)
_ io.Seeker = (*s3File)(nil)
_ fs.FileInfo = (*s3File)(nil)
_ fs.DirEntry = (*s3File)(nil)
_ io.ReaderAt = (*s3File)(nil)
_ io.Seeker = (*s3File)(nil)
_ fs.ReadDirFile = (*s3File)(nil)
)

const (
Expand All @@ -27,15 +28,16 @@ const (
)

type s3File struct {
s3client S3API
name string
bucket string
size int64
mode fs.FileMode
modTime time.Time // zero value for directories
offset int64
mutex sync.Mutex
body io.ReadCloser
s3client S3API
name string
bucket string
size int64
mode fs.FileMode
modTime time.Time // zero value for directories
offset int64
lastDirEntry string
mutex sync.Mutex
body io.ReadCloser
}

func (s3f *s3File) Stat() (fs.FileInfo, error) {
Expand Down Expand Up @@ -137,6 +139,55 @@ func (s3f *s3File) Seek(offset int64, whence int) (int64, error) {
return offset, nil
}

func (s3f *s3File) ReadDir(n int) ([]fs.DirEntry, error) {
if !s3f.IsDir() {
return nil, &fs.PathError{Op: opRead, Path: s3f.Name(), Err: fs.ErrNotExist}
}

prefix := s3f.name

if s3f.name == "." {
prefix = ""
}

params := &s3.ListObjectsV2Input{
Bucket: aws.String(s3f.bucket),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
}

if n > 0 {
params.MaxKeys = aws.Int32(int32(n))
}

if s3f.lastDirEntry != "" {
params.StartAfter = aws.String(s3f.lastDirEntry)
}

listRes, err := s3f.s3client.ListObjectsV2(context.Background(), params)
if err != nil {
return nil, err
}

entries, err := listResToEntries(s3f.bucket, s3f.s3client, listRes)
if err != nil {
return nil, err
}

if len(entries) == 0 {
return nil, io.EOF
}

if n > 0 && len(entries) == n {
des3f, ok := entries[n-1].(*s3File)
if ok {
s3f.lastDirEntry = des3f.name
}
}

return entries, nil
}

func (s3f *s3File) readerAt(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
byteRange := buildRange(offset, length)

Expand Down
Loading

0 comments on commit 10f658a

Please sign in to comment.