Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create entries batch multiple entries into single insert #550

Merged
merged 4 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (m *mockCollector) CollectEvent(_, _ string) {}
func getHandler(t *testing.T, opts ...testutil.GetDBOption) (http.Handler, *dependencies) {
conn, handlerDatabaseURI := testutil.GetDB(t, databaseURI, opts...)
blockAdapter := testutil.NewBlockAdapterByEnv(t, &block.NoOpTranslator{})
cataloger := catalog.NewCataloger(conn, catalog.WithCacheConfig(&catalog.CacheConfig{Enabled: false}))
cataloger := catalog.NewCataloger(conn, catalog.WithCacheEnabled(false))
authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{
Enabled: false,
})
Expand Down
79 changes: 48 additions & 31 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
defaultBatchDelay = 1000 * time.Microsecond
defaultBatchEntriesReadAtOnce = 64
defaultBatchReaders = 8

defaultBatchWriteEntriesInsertSize = 10
)

type DedupReport struct {
Expand Down Expand Up @@ -183,37 +185,29 @@ type CacheConfig struct {

// cataloger main catalog implementation based on mvcc
type cataloger struct {
params.Catalog
clock clock.Clock
log logging.Logger
db db.Database
wg sync.WaitGroup
cacheConfig *CacheConfig
cache Cache
dedupCh chan *dedupRequest
dedupReportEnabled bool
dedupReportCh chan *DedupReport
readEntryRequestChan chan *readRequest
batchParams params.BatchRead
}

type CatalogerOption func(*cataloger)

var defaultCatalogerCacheConfig = &CacheConfig{
Enabled: true,
Size: defaultCatalogerCacheSize,
Expiry: defaultCatalogerCacheExpiry,
Jitter: defaultCatalogerCacheJitter,
}

func WithClock(newClock clock.Clock) CatalogerOption {
return func(c *cataloger) {
c.clock = newClock
}
}

func WithCacheConfig(config *CacheConfig) CatalogerOption {
func WithCacheEnabled(b bool) CatalogerOption {
return func(c *cataloger) {
c.cacheConfig = config
c.Cache.Enabled = b
}
}

Expand All @@ -223,23 +217,36 @@ func WithDedupReportChannel(b bool) CatalogerOption {
}
}

func WithBatchReadParams(p params.BatchRead) CatalogerOption {
func WithParams(p params.Catalog) CatalogerOption {
return func(c *cataloger) {
if p.ScanTimeout != 0 {
c.batchParams.ScanTimeout = p.ScanTimeout
if p.BatchRead.ScanTimeout != 0 {
c.BatchRead.ScanTimeout = p.BatchRead.ScanTimeout
}
if p.BatchRead.Delay != 0 {
c.BatchRead.Delay = p.BatchRead.Delay
}
if p.BatchRead.EntriesAtOnce != 0 {
c.BatchRead.EntriesAtOnce = p.BatchRead.EntriesAtOnce
}
if p.BatchRead.EntryMaxWait != 0 {
c.BatchRead.EntryMaxWait = p.BatchRead.EntryMaxWait
}
if p.BatchRead.Readers != 0 {
c.BatchRead.Readers = p.BatchRead.Readers
}
if p.BatchDelay != 0 {
c.batchParams.BatchDelay = p.BatchDelay
if p.BatchWrite.EntriesInsertSize != 0 {
c.BatchWrite.EntriesInsertSize = p.BatchWrite.EntriesInsertSize
}
if p.EntriesReadAtOnce != 0 {
c.batchParams.EntriesReadAtOnce = p.EntriesReadAtOnce
if p.Cache.Size != 0 {
c.Cache.Size = p.Cache.Size
}
if p.ReadEntryMaxWait != 0 {
c.batchParams.ReadEntryMaxWait = p.ReadEntryMaxWait
if p.Cache.Expiry != 0 {
c.Cache.Expiry = p.Cache.Expiry
}
if p.Readers != 0 {
c.batchParams.Readers = p.Readers
if p.Cache.Jitter != 0 {
c.Cache.Jitter = p.Cache.Jitter
}
c.Cache.Enabled = p.Cache.Enabled
}
}

Expand All @@ -248,22 +255,32 @@ func NewCataloger(db db.Database, options ...CatalogerOption) Cataloger {
clock: clock.New(),
log: logging.Default().WithField("service_name", "cataloger"),
db: db,
cacheConfig: defaultCatalogerCacheConfig,
dedupCh: make(chan *dedupRequest, dedupChannelSize),
dedupReportEnabled: true,
batchParams: params.BatchRead{
ReadEntryMaxWait: defaultBatchReadEntryMaxWait,
ScanTimeout: defaultBatchScanTimeout,
BatchDelay: defaultBatchDelay,
EntriesReadAtOnce: defaultBatchEntriesReadAtOnce,
Readers: defaultBatchReaders,
Catalog: params.Catalog{
BatchRead: params.BatchRead{
EntryMaxWait: defaultBatchReadEntryMaxWait,
ScanTimeout: defaultBatchScanTimeout,
Delay: defaultBatchDelay,
EntriesAtOnce: defaultBatchEntriesReadAtOnce,
Readers: defaultBatchReaders,
},
BatchWrite: params.BatchWrite{
EntriesInsertSize: defaultBatchWriteEntriesInsertSize,
},
Cache: params.Cache{
Enabled: false,
Size: defaultCatalogerCacheSize,
Expiry: defaultCatalogerCacheExpiry,
Jitter: defaultCatalogerCacheJitter,
},
},
}
for _, opt := range options {
opt(c)
}
if c.cacheConfig.Enabled {
c.cache = NewLRUCache(c.cacheConfig.Size, c.cacheConfig.Expiry, c.cacheConfig.Jitter)
if c.Cache.Enabled {
c.cache = NewLRUCache(c.Cache.Size, c.Cache.Expiry, c.Cache.Jitter)
} else {
c.cache = &DummyCache{}
}
Expand Down
52 changes: 46 additions & 6 deletions catalog/cataloger_create_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package catalog

import (
"context"
"database/sql"
"fmt"

sq "github.com/Masterminds/squirrel"
"github.com/treeverse/lakefs/db"
)

// CreateEntries add multiple entries into the catalog, this process doesn't pass through de-dup mechanism.
// It is mainly used by import mass entries into the catalog.
func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string, entries []Entry) error {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
Expand All @@ -20,11 +24,23 @@ func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string
return nil
}

// validate that we have path on each entry
for i := range entries {
if !IsNonEmptyString(entries[i].Path) {
// validate that we have path on each entry and remember last entry based on path (for dup remove)
entriesMap := make(map[string]*Entry, len(entries))
for i := len(entries) - 1; i >= 0; i-- {
p := entries[i].Path
if !IsNonEmptyString(p) {
return fmt.Errorf("entry at pos %d, path: %w", i, ErrInvalidValue)
}
entriesMap[p] = &entries[i]
}

// prepare a list of entries to insert without duplicates
entriesToInsert := make([]*Entry, 0, len(entriesMap))
for i := range entries {
ent := entriesMap[entries[i].Path]
if &entries[i] == ent {
entriesToInsert = append(entriesToInsert, ent)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the goal here is to keep the insert the last object for each key in the order of the first appearance of each key. This is a bit odd. To keep the files in their expected ordering, I would write something like this (untested, of course):

for i := len(entries)-1, j := len(entries); i >= 0; i-- {
	if &entries[i] == entriesMap[entries[i].Path] {
		j--
		entries[j] := entries[i]
	}
}
entries = entries[j:]

It is a bit more subtle, but it gives a better ordering I think. If you don't care about the ordering, just iterate over the map.

If you want the last-last ordering above but without editing the slice in-place:

entriesToInsert := make([]*Entry, len(entriesMap))
for i := len(entries)-1, j := len(entries) - 1; i >= 0; i-- {
	if &entries[i] == entriesMap[entries[i].Path] {
		entriesToInsert[j] := entries[i]
		j--
	}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F2Fing, the existing code does exactly what I want. Sorry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like the first loop - I used slice of pointers and keep just the right references in the new slice that we process.

}

// create entries
Expand All @@ -33,9 +49,33 @@ func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string
if err != nil {
return nil, err
}
for i := range entries {
if _, err := insertEntry(tx, branchID, &entries[i]); err != nil {
return nil, fmt.Errorf("entry at %d: %w", i, err)
// single insert per batch
entriesInsertSize := c.BatchWrite.EntriesInsertSize
for i := 0; i < len(entriesToInsert); i += entriesInsertSize {
sqInsert := psql.Insert("catalog_entries").
Columns("branch_id", "path", "physical_address", "checksum", "size", "metadata", "creation_date", "is_expired")
j := i + entriesInsertSize
if j > len(entriesToInsert) {
j = len(entriesToInsert)
}
for _, entry := range entriesToInsert[i:j] {
var dbTime sql.NullTime
if !entry.CreationDate.IsZero() {
dbTime.Time = entry.CreationDate
dbTime.Valid = true
}
sqInsert = sqInsert.Values(branchID, entry.Path, entry.PhysicalAddress, entry.Checksum, entry.Size, entry.Metadata,
sq.Expr("COALESCE(?,NOW())", dbTime), entry.Expired)
}
query, args, err := sqInsert.Suffix(`ON CONFLICT (branch_id,path,min_commit)
DO UPDATE SET physical_address=EXCLUDED.physical_address, checksum=EXCLUDED.checksum, size=EXCLUDED.size, metadata=EXCLUDED.metadata, creation_date=EXCLUDED.creation_date, is_expired=EXCLUDED.is_expired, max_commit=catalog_max_commit_id()`).
ToSql()
if err != nil {
return nil, fmt.Errorf("build query: %w", err)
}
_, err = tx.Exec(query, args...)
if err != nil {
return nil, err
}
}
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion catalog/cataloger_delete_branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestCataloger_DeleteBranch(t *testing.T) {
ctx := context.Background()
c := testCataloger(t, WithCacheConfig(&CacheConfig{Enabled: false}))
c := testCataloger(t, WithCacheEnabled(false))

if err := c.CreateRepository(ctx, "repo1", "s3://bucket1", "master"); err != nil {
t.Fatal("create repository for testing", err)
Expand Down
2 changes: 1 addition & 1 deletion catalog/cataloger_get_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *cataloger) getEntryBatchMaybeExpired(ctx context.Context, repository st
select {
case response := <-replyChan:
return response.entry, response.err
case <-time.After(c.batchParams.ReadEntryMaxWait):
case <-time.After(c.BatchRead.EntryMaxWait):
return nil, ErrReadEntryTimeout
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion catalog/cataloger_get_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestCataloger_GetRepository(t *testing.T) {
ctx := context.Background()
c := testCataloger(t, WithCacheConfig(&CacheConfig{Enabled: false}))
c := testCataloger(t, WithCacheEnabled(false))

// create test data
for i := 1; i < 3; i++ {
Expand Down
14 changes: 7 additions & 7 deletions catalog/db_batch_entry_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (c *cataloger) readEntriesBatchOrchestrator() {
c.wg.Done()
}()

readersWG.Add(c.batchParams.Readers)
for i := 0; i < c.batchParams.Readers; i++ {
readersWG.Add(c.BatchRead.Readers)
for i := 0; i < c.BatchRead.Readers; i++ {
go c.readEntriesBatch(&readersWG, entriesReadBatchChan)
}
bufferingMap := make(map[bufferingKey]*readBatch)
timer := time.NewTimer(c.batchParams.ScanTimeout)
timer := time.NewTimer(c.BatchRead.ScanTimeout)
for {
if len(bufferingMap) > 0 {
timer.Reset(c.batchParams.ScanTimeout)
timer.Reset(c.BatchRead.ScanTimeout)
}
select {
case request, moreEntries := <-c.readEntryRequestChan:
Expand All @@ -68,18 +68,18 @@ func (c *cataloger) readEntriesBatchOrchestrator() {
if !exists {
batch = &readBatch{
startTime: time.Now(),
pathList: make([]pathRequest, 0, c.batchParams.EntriesReadAtOnce),
pathList: make([]pathRequest, 0, c.BatchRead.EntriesAtOnce),
}
bufferingMap[request.bufKey] = batch
}
batch.pathList = append(batch.pathList, request.pathReq)
if len(batch.pathList) == c.batchParams.EntriesReadAtOnce {
if len(batch.pathList) == c.BatchRead.EntriesAtOnce {
entriesReadBatchChan <- batchReadMessage{key: request.bufKey, batch: batch.pathList}
delete(bufferingMap, request.bufKey)
}
case <-timer.C:
for k, v := range bufferingMap {
if time.Since(v.startTime) > c.batchParams.BatchDelay {
if time.Since(v.startTime) > c.BatchRead.Delay {
entriesReadBatchChan <- batchReadMessage{key: k, batch: v.pathList}
delete(bufferingMap, k)
}
Expand Down
11 changes: 0 additions & 11 deletions catalog/params/batch_read.go

This file was deleted.

28 changes: 28 additions & 0 deletions catalog/params/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package params

import "time"

type Cache struct {
Enabled bool
Size int
Expiry time.Duration
Jitter time.Duration
}

type BatchRead struct {
EntryMaxWait time.Duration
ScanTimeout time.Duration
Delay time.Duration
EntriesAtOnce int
Readers int
}

type BatchWrite struct {
EntriesInsertSize int
}

type Catalog struct {
BatchRead BatchRead
BatchWrite BatchWrite
Cache Cache
}
2 changes: 1 addition & 1 deletion cmd/lakefs-loadtest/cmd/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var repoCmd = &cobra.Command{
ctx := context.Background()
database := connectToDB(connectionString)
conf := config.NewConfig()
c := catalog.NewCataloger(database, catalog.WithBatchReadParams(conf.GetCatalogerBatchReadParams()))
c := catalog.NewCataloger(database, catalog.WithParams(conf.GetCatalogerCatalogParams()))
if repository == "" {
repository = "repo-" + strings.ToLower(uuid.New().String())
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakefs/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var diagnoseCmd = &cobra.Command{
if err != nil {
logger.WithError(err).Fatal("Failed to create block adapter")
}
cataloger := catalog.NewCataloger(dbPool, catalog.WithBatchReadParams(conf.GetCatalogerBatchReadParams()))
cataloger := catalog.NewCataloger(dbPool, catalog.WithParams(conf.GetCatalogerCatalogParams()))

numFailures := 0
repos, _, err := cataloger.ListRepositories(ctx, -1, "")
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakefs/cmd/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var expireCmd = &cobra.Command{
conf := config.NewConfig()
logger := logging.FromContext(ctx)
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
cataloger := catalog.NewCataloger(dbPool, catalog.WithBatchReadParams(conf.GetCatalogerBatchReadParams()))
cataloger := catalog.NewCataloger(dbPool, catalog.WithParams(conf.GetCatalogerCatalogParams()))

awsRetentionConfig := config.NewConfig().GetAwsS3RetentionConfig()

Expand Down
2 changes: 1 addition & 1 deletion cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var runCmd = &cobra.Command{
migrator := db.NewDatabaseMigrator(dbParams)

// init catalog
cataloger := catalog.NewCataloger(dbPool, catalog.WithBatchReadParams(conf.GetCatalogerBatchReadParams()))
cataloger := catalog.NewCataloger(dbPool, catalog.WithParams(conf.GetCatalogerCatalogParams()))

// init block store
blockStore, err := factory.BuildBlockAdapter(cfg)
Expand Down
Loading