Skip to content

Commit

Permalink
Feature/import api orc (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Sep 10, 2020
1 parent 995ee86 commit 9e40dce
Show file tree
Hide file tree
Showing 22 changed files with 1,043 additions and 319 deletions.
9 changes: 8 additions & 1 deletion api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2216,7 +2216,14 @@ func (c *Controller) ImportFromS3InventoryHandler() repositories.ImportFromS3Inv
if err == nil {
username = userModel.Username
}
importer, err := onboard.CreateImporter(deps.ctx, deps.logger, deps.Cataloger, deps.BlockAdapter, username, params.ManifestURL, params.Repository)
importConfig := &onboard.ImporterConfig{
CommitUsername: username,
InventoryURL: params.ManifestURL,
Repository: params.Repository,
InventoryGenerator: deps.BlockAdapter,
Cataloger: deps.Cataloger,
}
importer, err := onboard.CreateImporter(deps.ctx, deps.logger, importConfig)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
Expand Down
2 changes: 1 addition & 1 deletion block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string) (block.Inventory, error) {
func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
return nil, fmt.Errorf("inventory %w", ErrNotImplemented)
}

Expand Down
2 changes: 1 addition & 1 deletion block/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type InventoryGenerator interface {
GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string) (Inventory, error)
GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool) (Inventory, error)
}

// Inventory represents a snapshot of the storage space
Expand Down
2 changes: 1 addition & 1 deletion block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,6 @@ func (l *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string) (block.Inventory, error) {
func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
return nil, ErrInventoryNotSupported
}
2 changes: 1 addition & 1 deletion block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,6 @@ func (a *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string) (block.Inventory, error) {
func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
return nil, ErrInventoryNotImplemented
}
138 changes: 55 additions & 83 deletions block/s3/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,51 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/treeverse/lakefs/block"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
"github.com/treeverse/lakefs/logging"
s3parquet "github.com/xitongsys/parquet-go-source/s3"
"github.com/xitongsys/parquet-go/reader"
)

type manifest struct {
URL string `json:"-"`
InventoryBucketArn string `json:"destinationBucket"`
SourceBucket string `json:"sourceBucket"`
Files []manifestFile `json:"files"`
Format string `json:"fileFormat"`
inventoryBucket string
}
var ErrInventoryFilesRangesOverlap = errors.New("got s3 inventory with files covering overlapping ranges")

type manifestFile struct {
Key string `json:"key"`
firstKey string
numRows int
type Manifest struct {
URL string `json:"-"`
InventoryBucketArn string `json:"destinationBucket"`
SourceBucket string `json:"sourceBucket"`
Files []inventoryFile `json:"files"` // inventory list files, each contains a list of objects
Format string `json:"fileFormat"`
inventoryBucket string
}

type ParquetReader interface {
Read(dstInterface interface{}) error
GetNumRows() int64
SkipRows(int64) error
type inventoryFile struct {
Key string `json:"key"` // an s3 key for an inventory list file
}

type parquetReaderGetter func(ctx context.Context, svc s3iface.S3API, inventoryBucket string, manifestFileKey string) (ParquetReader, CloseFunc, error)

type CloseFunc func() error

var ErrParquetOnlySupport = errors.New("currently only parquet inventories are supported")

func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string) (block.Inventory, error) {
return GenerateInventory(ctx, logger, manifestURL, a.s3, getParquetReader)
func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool) (block.Inventory, error) {
return GenerateInventory(logger, manifestURL, a.s3, inventorys3.NewReader(ctx, a.s3, logger), shouldSort)
}

func GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, s3 s3iface.S3API, getParquetReader parquetReaderGetter) (block.Inventory, error) {
func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader inventorys3.IReader, shouldSort bool) (block.Inventory, error) {
if logger == nil {
logger = logging.Default()
}
m, err := loadManifest(manifestURL, s3)
if err != nil {
return nil, err
}
err = m.readFileMetadata(ctx, logger, s3, getParquetReader)
if shouldSort {
err = sortManifest(m, logger, inventoryReader)
}
if err != nil {
return nil, err
}
if logger == nil {
logger = logging.Default()
}
sort.Slice(m.Files, func(i, j int) bool {
return m.Files[i].firstKey < m.Files[j].firstKey
})
return &Inventory{Manifest: m, S3: s3, getParquetReader: getParquetReader, logger: logger}, nil
return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader}, nil
}

type Inventory struct {
S3 s3iface.S3API
Manifest *manifest
ctx context.Context //nolint:structcheck // known issue: https://github.com/golangci/golangci-lint/issues/826)
getParquetReader parquetReaderGetter
logger logging.Logger
Manifest *Manifest
logger logging.Logger
shouldSort bool
reader inventorys3.IReader
}

func (inv *Inventory) Iterator() block.InventoryIterator {
Expand All @@ -86,33 +71,7 @@ func (inv *Inventory) InventoryURL() string {
return inv.Manifest.URL
}

func (m *manifest) readFileMetadata(ctx context.Context, logger logging.Logger, s3 s3iface.S3API, getParquetReader parquetReaderGetter) error {
for i := range m.Files {
filename := m.Files[i].Key
pr, closeReader, err := getParquetReader(ctx, s3, m.inventoryBucket, filename)
if err != nil {
return err
}
m.Files[i].numRows = int(pr.GetNumRows())
// read first row from file to store the first key:
rows := make([]ParquetInventoryObject, 1)
err = pr.Read(&rows)
if err != nil {
return err
}
err = closeReader()
if err != nil {
logger.WithFields(logging.Fields{"bucket": m.inventoryBucket, "key": filename}).
Error("failed to close parquet reader after reading metadata")
}
if len(rows) != 0 {
m.Files[i].firstKey = rows[0].Key
}
}
return nil
}

func loadManifest(manifestURL string, s3svc s3iface.S3API) (*manifest, error) {
func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) {
u, err := url.Parse(manifestURL)
if err != nil {
return nil, err
Expand All @@ -121,13 +80,13 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*manifest, error) {
if err != nil {
return nil, err
}
var m manifest
var m Manifest
err = json.NewDecoder(output.Body).Decode(&m)
if err != nil {
return nil, err
}
if m.Format != "Parquet" {
return nil, fmt.Errorf("%w. got: %s", ErrParquetOnlySupport, m.Format)
if m.Format != inventorys3.OrcFormatName && m.Format != inventorys3.ParquetFormatName {
return nil, fmt.Errorf("%w. got format: %s", inventorys3.ErrUnsupportedInventoryFormat, m.Format)
}
m.URL = manifestURL
inventoryBucketArn, err := arn.Parse(m.InventoryBucketArn)
Expand All @@ -138,19 +97,32 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*manifest, error) {
return &m, nil
}

func getParquetReader(ctx context.Context, svc s3iface.S3API, inventoryBucket string, manifestFileKey string) (ParquetReader, CloseFunc, error) {
pf, err := s3parquet.NewS3FileReaderWithClient(ctx, svc, inventoryBucket, manifestFileKey)
if err != nil {
return nil, nil, fmt.Errorf("failed to create parquet file reader: %w", err)
}
var rawObject ParquetInventoryObject
pr, err := reader.NewParquetReader(pf, &rawObject, 4)
if err != nil {
return nil, nil, fmt.Errorf("failed to create parquet reader: %w", err)
func sortManifest(m *Manifest, logger logging.Logger, reader inventorys3.IReader) error {
firstKeyByInventoryFile := make(map[string]string)
lastKeyByInventoryFile := make(map[string]string)
for _, f := range m.Files {
mr, err := reader.GetMetadataReader(m.Format, m.inventoryBucket, f.Key)
if err != nil {
return fmt.Errorf("failed to sort inventory files in manifest: %w", err)
}
firstKeyByInventoryFile[f.Key] = mr.FirstObjectKey()
lastKeyByInventoryFile[f.Key] = mr.LastObjectKey()
err = mr.Close()
if err != nil {
logger.Errorf("failed to close inventory file. file=%s, err=%w", f, err)
}
}
closer := func() error {
pr.ReadStop()
return pf.Close()
sort.Slice(m.Files, func(i, j int) bool {
return firstKeyByInventoryFile[m.Files[i].Key] < firstKeyByInventoryFile[m.Files[j].Key] ||
(firstKeyByInventoryFile[m.Files[i].Key] == firstKeyByInventoryFile[m.Files[j].Key] &&
lastKeyByInventoryFile[m.Files[i].Key] < lastKeyByInventoryFile[m.Files[j].Key])
})
// validate sorting: if a file begins before the next one ends - the files cover overlapping ranges,
// which we don't know how to handle.
for i := 0; i < len(m.Files)-1; i++ {
if firstKeyByInventoryFile[m.Files[i+1].Key] < lastKeyByInventoryFile[m.Files[i].Key] {
return ErrInventoryFilesRangesOverlap
}
}
return pr, closer, nil
return nil
}
Loading

0 comments on commit 9e40dce

Please sign in to comment.