Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
storage: support s3 seek end (#476) (#489)
Browse files Browse the repository at this point in the history
* support s3 seek end

* support nil for WalkOption

* try parse range info from respose

* fix test

* fix lint

* fix a bug in s3 WalkDir

* update

* update

* fix

* loop read s3 reader

* fix lint

* fix more lint

* fmt

* use io.ReadFull instead of manual loop

* fix

* fix

* add a comment for the reason to put in struct

* add a comment

Co-authored-by: glorv <glorvs@163.com>
Co-authored-by: 3pointer <luancheng@pingcap.com>
  • Loading branch information
3 people authored Sep 8, 2020
1 parent fc9d815 commit 4e68a8d
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 32 deletions.
128 changes: 96 additions & 32 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"io/ioutil"
"net/url"
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -365,6 +367,9 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
if opt == nil {
opt = &WalkOption{}
}
var marker *string
prefix := rs.options.Prefix + opt.SubDir
maxKeys := int64(1000)
Expand Down Expand Up @@ -393,7 +398,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
}
}
if res.IsTruncated != nil && *res.IsTruncated {
marker = res.Marker
marker = res.NextMarker
} else {
break
}
Expand All @@ -404,59 +409,114 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin

// Open a Reader by file path.
func (rs *S3Storage) Open(ctx context.Context, path string) (ReadSeekCloser, error) {
reader, err := rs.open(ctx, path, 0, 0)
reader, r, err := rs.open(ctx, path, 0, 0)
if err != nil {
return nil, err
}
return &s3ObjectReader{
storage: rs,
name: path,
reader: reader,
storage: rs,
name: path,
reader: reader,
ctx: ctx,
rangeInfo: r,
}, nil
}

func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, error) {
type rangeInfo struct {
start int64
end int64
size int64
}

// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset).
func (rs *S3Storage) open(
ctx context.Context,
path string,
startOffset, endOffset int64,
) (io.ReadCloser, rangeInfo, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + path),
}

// always set rangeOffset to fetch file size info
// s3 endOffset is inclusive
var rangeOffset *string
if startOffset > 0 {
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}
input.Range = rangeOffset
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}

input.Range = rangeOffset
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
return nil, rangeInfo{}, err
}

// FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range',
// not sure whether this is a feature or bug
//if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) {
// return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
// name, *rangeOffset, result.AcceptRanges)
//}
r, err := parseRangeInfo(result.ContentRange)
if err != nil {
return nil, rangeInfo{}, errors.Trace(err)
}

if startOffset != r.start || (endOffset != 0 && endOffset != r.end+1) {
return nil, r, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
path, *rangeOffset, result.ContentRange)
}

return result.Body, r, nil
}

var (
contentRangeRegex = regexp.MustCompile(`bytes (\d+)-(\d+)/(\d+)$`)
)

return result.Body, nil
func parseRangeInfo(info *string) (rangeInfo, error) {
if info == nil || len(*info) == 0 {
return rangeInfo{}, errors.New("ContentRange is empty")
}
subMatches := contentRangeRegex.FindStringSubmatch(*info)
if len(subMatches) != 4 {
return rangeInfo{}, errors.Errorf("invalid content range: '%s'", *info)
}

start, err := strconv.ParseInt(subMatches[1], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info)
}
end, err := strconv.ParseInt(subMatches[2], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info)
}
size, err := strconv.ParseInt(subMatches[3], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info)
}
return rangeInfo{start: start, end: end, size: size}, nil
}

// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method.
type s3ObjectReader struct {
storage *S3Storage
name string
reader io.ReadCloser
pos int64
storage *S3Storage
name string
reader io.ReadCloser
pos int64
rangeInfo rangeInfo
// reader context used for implement `io.Seek`
// currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker`
// See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10
ctx context.Context
}

// Read implement the io.Reader interface.
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
maxCnt := r.rangeInfo.end + 1 - r.pos
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
n, err = io.ReadFull(r.reader, p[:maxCnt])
r.pos += int64(n)
return
}
Expand All @@ -467,25 +527,28 @@ func (r *s3ObjectReader) Close() error {
}

// Seek implement the io.Seeker interface.
//
// Currently, tidb-lightning depends on this method to read parquet file for s3 storage.
func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
var realOffset int64
switch whence {
case io.SeekStart:
realOffset = offset
case io.SeekCurrent:
realOffset = r.pos + offset
case io.SeekEnd:
realOffset = r.rangeInfo.size + offset
default:
// TODO: maybe we can fetch the object stat and calculate the absolute offset
return 0, errors.New("seek by SeekEnd is not supported yet")
return 0, errors.Errorf("Seek: invalid whence '%d'", whence)
}

if realOffset == r.pos {
return realOffset, nil
}

// if seek ahead no more than 64k, we discard these data
if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, offset-r.pos)
if realOffset > r.pos && realOffset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, realOffset-r.pos)
if err != nil {
return r.pos, err
}
Expand All @@ -498,11 +561,12 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
return 0, err
}

newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0)
newReader, info, err := r.storage.open(r.ctx, r.name, realOffset, 0)
if err != nil {
return 0, err
}
r.reader = newReader
r.rangeInfo = info
r.pos = realOffset
return realOffset, nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ func (r *testStorageSuite) TestS3Others(c *C) {
defineS3Flags(&pflag.FlagSet{})
}

func (r *testStorageSuite) TestS3Range(c *C) {
contentRange := "bytes 0-9/443"
ri, err := parseRangeInfo(&contentRange)
c.Assert(err, IsNil)
c.Assert(ri, Equals, rangeInfo{start: 0, end: 9, size: 443})

_, err = parseRangeInfo(nil)
c.Assert(err, ErrorMatches, "ContentRange is empty")

badRange := "bytes "
_, err = parseRangeInfo(&badRange)
c.Assert(err, ErrorMatches, "invalid content range: 'bytes '")
}

type mockS3Handler struct {
err error
}
Expand Down

0 comments on commit 4e68a8d

Please sign in to comment.