Skip to content

Commit

Permalink
724 bi collection improvments (#770)
Browse files Browse the repository at this point in the history
* bi collect aws account id

* wip

* bi collect account id - cont

* revert adapter changes; fix tests

* Create cloud package

* wip

* Add MetadataProvider to cloud package

* CR fixes

* CR fix: remove unneeded alias

* CR fixes: account id hashing, split metadata fields

* rename constants

* fix import cycle and lint errors
  • Loading branch information
johnnyaug authored Oct 6, 2020
1 parent be24c89 commit ab0332a
Show file tree
Hide file tree
Showing 23 changed files with 274 additions and 161 deletions.
72 changes: 36 additions & 36 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,32 @@ const (
)

type Dependencies struct {
ctx context.Context
Cataloger catalog.Cataloger
Auth auth.Service
BlockAdapter block.Adapter
Stats stats.Collector
Retention retention.Service
Dedup *dedup.Cleaner
Meta auth.MetadataManager
Migrator db.Migrator
Collector stats.Collector
logger logging.Logger
ctx context.Context
Cataloger catalog.Cataloger
Auth auth.Service
BlockAdapter block.Adapter
Stats stats.Collector
Retention retention.Service
Dedup *dedup.Cleaner
MetadataManager auth.MetadataManager
Migrator db.Migrator
Collector stats.Collector
logger logging.Logger
}

func (d *Dependencies) WithContext(ctx context.Context) *Dependencies {
return &Dependencies{
ctx: ctx,
Cataloger: d.Cataloger,
Auth: d.Auth,
BlockAdapter: d.BlockAdapter.WithContext(ctx),
Stats: d.Stats,
Retention: d.Retention,
Dedup: d.Dedup,
Meta: d.Meta,
Migrator: d.Migrator,
Collector: d.Collector,
logger: d.logger.WithContext(ctx),
ctx: ctx,
Cataloger: d.Cataloger,
Auth: d.Auth,
BlockAdapter: d.BlockAdapter.WithContext(ctx),
Stats: d.Stats,
Retention: d.Retention,
Dedup: d.Dedup,
MetadataManager: d.MetadataManager,
Migrator: d.Migrator,
Collector: d.Collector,
logger: d.logger.WithContext(ctx),
}
}

Expand All @@ -95,20 +95,20 @@ type Controller struct {
}

func NewController(cataloger catalog.Cataloger, auth auth.Service, blockAdapter block.Adapter, stats stats.Collector, retention retention.Service,
dedupCleaner *dedup.Cleaner, meta auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller {
dedupCleaner *dedup.Cleaner, metadataManager auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller {
c := &Controller{
deps: &Dependencies{
ctx: context.Background(),
Cataloger: cataloger,
Auth: auth,
BlockAdapter: blockAdapter,
Stats: stats,
Retention: retention,
Dedup: dedupCleaner,
Meta: meta,
Migrator: migrator,
Collector: collector,
logger: logger,
ctx: context.Background(),
Cataloger: cataloger,
Auth: auth,
BlockAdapter: blockAdapter,
Stats: stats,
Retention: retention,
Dedup: dedupCleaner,
MetadataManager: metadataManager,
Migrator: migrator,
Collector: collector,
logger: logger,
},
}
return c
Expand Down Expand Up @@ -235,7 +235,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
}

// check if previous setup completed
if ts, _ := c.deps.Meta.SetupTimestamp(); !ts.IsZero() {
if ts, _ := c.deps.MetadataManager.SetupTimestamp(); !ts.IsZero() {
return setupop.NewSetupLakeFSConflict().
WithPayload(&models.Error{
Message: "lakeFS already initialized",
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
}

// update setup completed timestamp
if err := c.deps.Meta.UpdateSetupTimestamp(time.Now()); err != nil {
if err := c.deps.MetadataManager.UpdateSetupTimestamp(time.Now()); err != nil {
c.deps.logger.WithError(err).Error("Failed the update setup timestamp")
}

Expand Down
44 changes: 22 additions & 22 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ var (
)

type Handler struct {
meta auth.MetadataManager
cataloger catalog.Cataloger
blockStore block.Adapter
authService auth.Service
stats stats.Collector
retention retention.Service
migrator db.Migrator
apiServer *restapi.Server
handler *http.ServeMux
dedupCleaner *dedup.Cleaner
logger logging.Logger
metadataManager auth.MetadataManager
cataloger catalog.Cataloger
blockStore block.Adapter
authService auth.Service
stats stats.Collector
retention retention.Service
migrator db.Migrator
apiServer *restapi.Server
handler *http.ServeMux
dedupCleaner *dedup.Cleaner
logger logging.Logger
}

func NewHandler(
cataloger catalog.Cataloger,
blockStore block.Adapter,
authService auth.Service,
meta auth.MetadataManager,
metadataManager auth.MetadataManager,
stats stats.Collector,
retention retention.Service,
migrator db.Migrator,
Expand All @@ -67,15 +67,15 @@ func NewHandler(
) http.Handler {
logger.Info("initialized OpenAPI server")
s := &Handler{
cataloger: cataloger,
blockStore: blockStore,
authService: authService,
meta: meta,
stats: stats,
retention: retention,
migrator: migrator,
dedupCleaner: dedupCleaner,
logger: logger,
cataloger: cataloger,
blockStore: blockStore,
authService: authService,
metadataManager: metadataManager,
stats: stats,
retention: retention,
migrator: migrator,
dedupCleaner: dedupCleaner,
logger: logger,
}
s.buildAPI()
return s.handler
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *Handler) buildAPI() {
api.BasicAuthAuth = s.BasicAuth()
api.JwtTokenAuth = s.JwtTokenAuth()
// bind our handlers to the server
NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.meta, s.migrator, s.stats, s.logger).Configure(api)
NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.metadataManager, s.migrator, s.stats, s.logger).Configure(api)

// setup host/port
s.apiServer = restapi.NewServer(api)
Expand Down
3 changes: 2 additions & 1 deletion api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

dbparams "github.com/treeverse/lakefs/db/params"
"github.com/treeverse/lakefs/dedup"
"github.com/treeverse/lakefs/stats"

"github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client"
Expand Down Expand Up @@ -71,7 +72,7 @@ func createDefaultAdminUser(authService auth.Service, t *testing.T) *authmodel.C

type mockCollector struct{}

func (m *mockCollector) CollectMetadata(_ map[string]string) {}
func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {}

func (m *mockCollector) CollectEvent(_, _ string) {}

Expand Down
14 changes: 7 additions & 7 deletions block/s3/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/treeverse/lakefs/block"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
"github.com/treeverse/lakefs/cloud/aws/s3inventory"
"github.com/treeverse/lakefs/logging"
)

Expand All @@ -33,10 +33,10 @@ type inventoryFile struct {
}

func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool) (block.Inventory, error) {
return GenerateInventory(logger, manifestURL, a.s3, inventorys3.NewReader(ctx, a.s3, logger), shouldSort)
return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort)
}

func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader inventorys3.IReader, shouldSort bool) (block.Inventory, error) {
func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool) (block.Inventory, error) {
if logger == nil {
logger = logging.Default()
}
Expand All @@ -57,7 +57,7 @@ type Inventory struct {
Manifest *Manifest
logger logging.Logger
shouldSort bool
reader inventorys3.IReader
reader s3inventory.IReader
}

func (inv *Inventory) Iterator() block.InventoryIterator {
Expand Down Expand Up @@ -86,8 +86,8 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) {
if err != nil {
return nil, err
}
if m.Format != inventorys3.OrcFormatName && m.Format != inventorys3.ParquetFormatName {
return nil, fmt.Errorf("%w. got format: %s", inventorys3.ErrUnsupportedInventoryFormat, m.Format)
if m.Format != s3inventory.OrcFormatName && m.Format != s3inventory.ParquetFormatName {
return nil, fmt.Errorf("%w. got format: %s", s3inventory.ErrUnsupportedInventoryFormat, m.Format)
}
m.URL = manifestURL
inventoryBucketArn, err := arn.Parse(m.InventoryBucketArn)
Expand All @@ -98,7 +98,7 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) {
return &m, nil
}

func sortManifest(m *Manifest, logger logging.Logger, reader inventorys3.IReader) error {
func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader) error {
firstKeyByInventoryFile := make(map[string]string)
lastKeyByInventoryFile := make(map[string]string)
for _, f := range m.Files {
Expand Down
6 changes: 3 additions & 3 deletions block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/cloud/aws/s3inventory"
"github.com/treeverse/lakefs/cmdutils"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
)

var ErrInventoryNotSorted = errors.New("got unsorted s3 inventory")
Expand All @@ -17,7 +17,7 @@ type InventoryIterator struct {
*Inventory
err error
val *block.InventoryObject
buffer []inventorys3.InventoryObject
buffer []s3inventory.InventoryObject
inventoryFileIndex int
valIndexInBuffer int
inventoryFileProgress *cmdutils.Progress
Expand Down Expand Up @@ -94,7 +94,7 @@ func (it *InventoryIterator) fillBuffer() bool {
it.logger.Errorf("failed to close manifest file reader. file=%s, err=%w", it.Manifest.Files[it.inventoryFileIndex].Key, err)
}
}()
it.buffer = make([]inventorys3.InventoryObject, rdr.GetNumRows())
it.buffer = make([]s3inventory.InventoryObject, rdr.GetNumRows())
err = rdr.Read(&it.buffer)
if err != nil {
it.err = err
Expand Down
18 changes: 9 additions & 9 deletions block/s3/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (
"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/block/s3"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
"github.com/treeverse/lakefs/cloud/aws/s3inventory"
"github.com/treeverse/lakefs/logging"
)

var ErrReadFile = errors.New("error reading file")

func rows(keys []string, lastModified map[string]time.Time) []*inventorys3.InventoryObject {
func rows(keys []string, lastModified map[string]time.Time) []*s3inventory.InventoryObject {
if keys == nil {
return nil
}
res := make([]*inventorys3.InventoryObject, len(keys))
res := make([]*s3inventory.InventoryObject, len(keys))
for i, key := range keys {
if key != "" {
res[i] = new(inventorys3.InventoryObject)
res[i] = new(s3inventory.InventoryObject)
res[i].Key = key
res[i].IsLatest = swag.Bool(!strings.Contains(key, "_expired"))
res[i].IsDeleteMarker = swag.Bool(strings.Contains(key, "_del"))
Expand Down Expand Up @@ -210,7 +210,7 @@ type mockInventoryReader struct {
}

type mockInventoryFileReader struct {
rows []*inventorys3.InventoryObject
rows []*s3inventory.InventoryObject
nextIdx int
inventoryReader *mockInventoryReader
key string
Expand Down Expand Up @@ -247,8 +247,8 @@ func (m *mockInventoryFileReader) Close() error {
}

func (m *mockInventoryFileReader) Read(dstInterface interface{}) error {
res := make([]inventorys3.InventoryObject, 0, len(m.rows))
dst := dstInterface.(*[]inventorys3.InventoryObject)
res := make([]s3inventory.InventoryObject, 0, len(m.rows))
dst := dstInterface.(*[]s3inventory.InventoryObject)
for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+len(*dst); i++ {
if m.rows[i] == nil {
return ErrReadFile // for test - simulate file with error
Expand All @@ -264,12 +264,12 @@ func (m *mockInventoryFileReader) GetNumRows() int64 {
return int64(len(m.rows))
}

func (m *mockInventoryReader) GetFileReader(_ string, _ string, key string) (inventorys3.FileReader, error) {
func (m *mockInventoryReader) GetFileReader(_ string, _ string, key string) (s3inventory.FileReader, error) {
m.openFiles[key] = true
return &mockInventoryFileReader{rows: rows(fileContents[key], m.lastModified), inventoryReader: m, key: key}, nil
}

func (m *mockInventoryReader) GetMetadataReader(_ string, _ string, key string) (inventorys3.MetadataReader, error) {
func (m *mockInventoryReader) GetMetadataReader(_ string, _ string, key string) (s3inventory.MetadataReader, error) {
m.openFiles[key] = true
return &mockInventoryFileReader{rows: rows(fileContents[key], m.lastModified), inventoryReader: m, key: key}, nil
}
Expand Down
41 changes: 41 additions & 0 deletions cloud/aws/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package aws

import (
"crypto/md5" //nolint:gosec
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/treeverse/lakefs/cloud"
"github.com/treeverse/lakefs/logging"
)

type MetadataProvider struct {
logger logging.Logger
awsConfig *aws.Config
}

func NewMetadataProvider(logger logging.Logger, awsConfig *aws.Config) *MetadataProvider {
return &MetadataProvider{logger: logger, awsConfig: awsConfig}
}

func (m *MetadataProvider) GetMetadata() map[string]string {
sess, err := session.NewSession(m.awsConfig)
if err != nil {
m.logger.Warnf("%v: failed to create AWS session for BI", err)
return nil
}
sess.ClientConfig(s3.ServiceName)
stsClient := sts.New(sess)
identity, err := stsClient.GetCallerIdentity(&sts.GetCallerIdentityInput{})
if err != nil {
m.logger.Warnf("%v: failed to get AWS account ID for BI", err)
return nil
}
return map[string]string{
cloud.IDKey: fmt.Sprintf("%x", md5.Sum([]byte(*identity.Account))), //nolint:gosec
cloud.IDTypeKey: "aws_account_id",
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3inventory

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3inventory

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3inventory

import "github.com/xitongsys/parquet-go/reader"

Expand Down
2 changes: 1 addition & 1 deletion inventory/s3/reader.go → cloud/aws/s3inventory/reader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3inventory

import (
"context"
Expand Down
Loading

0 comments on commit ab0332a

Please sign in to comment.