Skip to content

Commit

Permalink
Merge pull request #8 from wolfeidau/fix_read_at_refinements
Browse files Browse the repository at this point in the history
fix(s3): read offset changes
  • Loading branch information
wolfeidau authored Jun 18, 2023
2 parents 2e3b869 + c161049 commit 9f8cb52
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 31 deletions.
2 changes: 1 addition & 1 deletion example/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {

s3fs := s3iofs.NewWithClient(os.Getenv("TEST_BUCKET_NAME"), client)

err = fs.WalkDir(s3fs, "parquet", func(path string, d fs.DirEntry, err error) error {
err = fs.WalkDir(s3fs, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
Expand Down
27 changes: 12 additions & 15 deletions s3file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ const (
)

type s3File struct {
s3fs *S3FS
name string
bucket string
res *s3.HeadObjectOutput
size int64
mode fs.FileMode
modTime time.Time // zero value for directories
offset int64
s3client S3API
name string
bucket string
size int64
mode fs.FileMode
modTime time.Time // zero value for directories
offset int64
}

func (s3f *s3File) Stat() (fs.FileInfo, error) {
Expand All @@ -48,8 +47,6 @@ func (s3f *s3File) Read(p []byte) (int, error) {
return 0, &fs.PathError{Op: opRead, Path: s3f.name, Err: errors.New("is a directory")}
}

// fmt.Println("offset", s3f.offset, "size", s3f.size)

if s3f.offset >= s3f.size {
return 0, io.EOF
}
Expand Down Expand Up @@ -97,7 +94,7 @@ func (s3f *s3File) ReadAt(p []byte, offset int64) (n int, err error) {
}
}

fmt.Println("size", s3f.size)
// fmt.Println("offset", offset, "size", s3f.size)

return size, r.Close()
}
Expand All @@ -124,17 +121,17 @@ func (s3f *s3File) Seek(offset int64, whence int) (int64, error) {
func (s3f *s3File) readerAt(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
byteRange := buildRange(offset, length)

res, err := s3f.s3fs.s3client.GetObject(ctx, &s3.GetObjectInput{
req := &s3.GetObjectInput{
Bucket: aws.String(s3f.bucket),
Key: aws.String(s3f.name),
Range: byteRange,
})
}

res, err := s3f.s3client.GetObject(ctx, req)
if err != nil {
return nil, &fs.PathError{Op: opRead, Path: s3f.name, Err: err}
}

// fmt.Println("offset", aws.ToString(byteRange), "content-length", res.ContentLength)

return res.Body, nil
}

Expand Down
123 changes: 123 additions & 0 deletions s3file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package s3iofs

import (
"bytes"
"context"
"io"
"os"
"strconv"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockGetObjectAPI struct {
getObject func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
listObjectsV2 func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
headObject func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
}

func (m mockGetObjectAPI) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
return m.getObject(ctx, params, optFns...)
}

func (m mockGetObjectAPI) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
return m.listObjectsV2(ctx, params, optFns...)
}
func (m mockGetObjectAPI) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
return m.headObject(ctx, params, optFns...)
}

func TestReadAt(t *testing.T) {

cases := []struct {
client func(t *testing.T) mockGetObjectAPI
bucket string
key string
expect []byte
}{
{
client: func(t *testing.T) mockGetObjectAPI {
return mockGetObjectAPI{
getObject: func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
t.Helper()

assert.Equal(t, aws.String("fooBucket"), params.Bucket)
assert.Equal(t, aws.String("barKey"), params.Key)
assert.Equal(t, aws.String("bytes=0-1023"), params.Range)

return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader(make([]byte, 1024))),
}, nil
},
headObject: func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
t.Helper()

assert.Equal(t, aws.String("fooBucket"), params.Bucket)
assert.Equal(t, aws.String("barKey"), params.Key)

return &s3.HeadObjectOutput{
ContentLength: 1024,
}, nil
},
}
},
bucket: "fooBucket",
key: "barKey",
expect: []byte("this is the body foo bar baz"),
},
}

for i, tt := range cases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
assert := require.New(t)

// ctx := context.TODO()

sysfs := NewWithClient(tt.bucket, tt.client(t))

f, err := sysfs.Open(tt.key)
assert.NoError(err)

data := make([]byte, 1024)

if rc, ok := f.(io.ReaderAt); ok {

n, err := rc.ReadAt(data, 0)
assert.NoError(err)
assert.Equal(1024, n)
}
})
}
}

func TestReadAtInt(t *testing.T) {
if os.Getenv("TEST_BUCKET_NAME") == "" {
t.Skip()
}
assert := require.New(t)

// Load the Shared AWS Configuration (~/.aws/config)
awscfg, err := config.LoadDefaultConfig(context.TODO())
assert.NoError(err)

client := s3.NewFromConfig(awscfg)

res, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(os.Getenv("TEST_BUCKET_NAME")),
Key: aws.String("1kfile"),
Range: aws.String("bytes=0-1023"),
})
assert.NoError(err)
assert.Equal(int64(1024), res.ContentLength)

data := make([]byte, 1024)

n, err := res.Body.Read(data)
assert.NoError(err) // assertion fails because we get back io.EOF
assert.Equal(1024, n)
}
29 changes: 14 additions & 15 deletions s3iofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,20 @@ func (s3fs *S3FS) Open(name string) (fs.File, error) {

if name == "." {
return &s3File{
s3fs: s3fs,
name: name,
bucket: s3fs.bucket,
mode: fs.ModeDir,
s3client: s3fs.s3client,
name: name,
bucket: s3fs.bucket,
mode: fs.ModeDir,
}, nil
}

// optimistic GetObject using name
res, err := s3fs.s3client.HeadObject(context.TODO(), &s3.HeadObjectInput{
req := &s3.HeadObjectInput{
Bucket: aws.String(s3fs.bucket),
Key: aws.String(name),
})
}

// optimistic GetObject using name
res, err := s3fs.s3client.HeadObject(context.TODO(), req)
if err != nil {
var nfe *types.NotFound
if errors.As(err, &nfe) {
Expand All @@ -73,15 +75,12 @@ func (s3fs *S3FS) Open(name string) (fs.File, error) {
return nil, err
}

// fmt.Println("res.ContentLength", res.ContentLength)

return &s3File{
s3fs: s3fs,
name: name,
bucket: s3fs.bucket,
res: res,
size: res.ContentLength,
modTime: aws.ToTime(res.LastModified),
s3client: s3fs.s3client,
name: name,
bucket: s3fs.bucket,
size: res.ContentLength,
modTime: aws.ToTime(res.LastModified),
}, nil
}

Expand Down

0 comments on commit 9f8cb52

Please sign in to comment.