Skip to content

Commit

Permalink
Feature/staging mgr postgres (#981)
Browse files Browse the repository at this point in the history
* staging manager implementation

* add newline

* support tombstone, add tests

* support tombstone, add tests

* naming, tidying

* add read only to read ops

* organize imports

* organize imports, remove partitioning

* add partitions to staging entries

* CR fixes

* CR fixes

* CR fixes - iterator

* CR fixes

* remove unused

* more CR fixes: test

* graveler based implementation

* adapt test to graveelr

* disallow nil values

* disallow nil values

* revert entry catalog change

* staging_kv > kv_staging

* change value to struct in Set
  • Loading branch information
johnnyaug authored Dec 6, 2020
1 parent 5e34175 commit 3084355
Show file tree
Hide file tree
Showing 7 changed files with 522 additions and 20 deletions.
3 changes: 3 additions & 0 deletions ddl/000016_staging_mgr.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
DROP TABLE IF EXISTS kv_staging;
COMMIT;
22 changes: 22 additions & 0 deletions ddl/000016_staging_mgr.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
BEGIN;
CREATE TABLE IF NOT EXISTS kv_staging
(
staging_token varchar not null,
key bytea not null,
identity bytea not null,
data bytea
) PARTITION BY HASH (staging_token);
CREATE TABLE IF NOT EXISTS kv_staging_p0 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 0);
CREATE TABLE IF NOT EXISTS kv_staging_p1 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 1);
CREATE TABLE IF NOT EXISTS kv_staging_p2 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 2);
CREATE TABLE IF NOT EXISTS kv_staging_p3 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 3);
CREATE TABLE IF NOT EXISTS kv_staging_p4 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 4);
CREATE TABLE IF NOT EXISTS kv_staging_p5 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 5);
CREATE TABLE IF NOT EXISTS kv_staging_p6 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 6);
CREATE TABLE IF NOT EXISTS kv_staging_p7 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 7);
CREATE TABLE IF NOT EXISTS kv_staging_p8 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 8);
CREATE TABLE IF NOT EXISTS kv_staging_p9 PARTITION OF kv_staging FOR VALUES WITH (MODULUS 10, REMAINDER 9);

CREATE UNIQUE index IF NOT EXISTS kv_staging_uidx
on kv_staging (staging_token asc, key asc);
COMMIT;
36 changes: 16 additions & 20 deletions graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ type RepositoryRecord struct {

// Value represents metadata or a given object (modified date, physical address, etc)
type Value struct {
Identity []byte
Data []byte
Identity []byte `db:"identity"`
Data []byte `db:"data"`
}

// ValueRecord holds Key with the associated Value information
type ValueRecord struct {
Key Key
Key Key `db:"key"`
*Value
}

Expand Down Expand Up @@ -153,7 +153,7 @@ type KeyValueStore interface {
// Delete value from repository / branch branch by key
Delete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key) error

// List lists entries on repository / ref will filter by prefix, from key 'from'.
// List lists values on repository / ref will filter by prefix, from key 'from'.
// When 'delimiter' is set the listing will include common prefixes based on the delimiter
List(ctx context.Context, repositoryID RepositoryID, ref Ref, prefix, from, delimiter Key) (ListingIterator, error)
}
Expand Down Expand Up @@ -354,27 +354,23 @@ type CommittedManager interface {
Apply(ctx context.Context, ns StorageNamespace, treeID TreeID, iterator ValueIterator) (TreeID, error)
}

// StagingManager handles changes to a branch that aren't yet committed
// provides basic CRUD abilities, with deletes being written as tombstones (null value)
// StagingManager manages entries in a staging area, denoted by a staging token
type StagingManager interface {
// Get returns the provided key (or nil value to represent a tombstone)
// Returns ErrNotFound if no value found on key
Get(ctx context.Context, repositoryID RepositoryID, branchID BranchID, st StagingToken, key Key) (*Value, error)
// Get returns the value for the provided staging token and key
// Returns ErrNotFound if no value found on key.
Get(ctx context.Context, st StagingToken, key Key) (*Value, error)

// Set writes an value (or nil value to represent a tombstone)
Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value *Value) error
// Set writes a value under the given staging token and key.
Set(ctx context.Context, st StagingToken, key Key, value Value) error

// Delete deletes an value by key
Delete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key) error

// List takes a given repository / branch and returns an ValueIterator
List(ctx context.Context, repositoryID RepositoryID, branchID BranchID, st StagingToken) (ValueIterator, error)
// Delete deletes a value by staging token and key
Delete(ctx context.Context, st StagingToken, key Key) error

// Snapshot take a snapshot of the current staging and returns a new staging token
Snapshot(ctx context.Context, repositoryID RepositoryID, branchID BranchID, st StagingToken) (StagingToken, error)
// List returns a ValueIterator for the given staging token
List(ctx context.Context, st StagingToken) (ValueIterator, error)

// ListSnapshot returns an iterator to scan the snapshot entries
ListSnapshot(ctx context.Context, repositoryID RepositoryID, branchID BranchID, st StagingToken) (ValueIterator, error)
// Drop clears the given staging area
Drop(ctx context.Context, st StagingToken) error
}

var (
Expand Down
37 changes: 37 additions & 0 deletions graveler/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package graveler

import (
"flag"
"log"
"os"
"testing"

"github.com/ory/dockertest/v3"
"github.com/sirupsen/logrus"
"github.com/treeverse/lakefs/testutil"
)

var (
pool *dockertest.Pool
databaseURI string
)

func TestMain(m *testing.M) {
flag.Parse()
if !testing.Verbose() {
// keep the log level calm
logrus.SetLevel(logrus.PanicLevel)
}

// postgres container
var err error
pool, err = dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to Docker: %s", err)
}
var closer func()
databaseURI, closer = testutil.GetDBInstance(pool)
code := m.Run()
closer() // cleanup
os.Exit(code)
}
73 changes: 73 additions & 0 deletions graveler/staging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package graveler

import (
"context"

"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)

type stagingManager struct {
db db.Database
log logging.Logger
}

func NewStagingManager(db db.Database) StagingManager {
return &stagingManager{
db: db,
log: logging.Default().WithField("service_name", "postgres_staging_manager"),
}
}

func (p *stagingManager) Get(ctx context.Context, st StagingToken, key Key) (*Value, error) {
res, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
value := &Value{}
err := tx.Get(value, "SELECT identity, data FROM kv_staging WHERE staging_token=$1 AND key=$2", st, key)
return value, err
}, p.txOpts(ctx, db.ReadOnly())...)
if err != nil {
return nil, err
}
return res.(*Value), nil
}

func (p *stagingManager) Set(ctx context.Context, st StagingToken, key Key, value Value) error {
if value.Identity == nil {
return ErrInvalidValue
}
_, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
return tx.Exec(`INSERT INTO kv_staging (staging_token, key, identity, data)
VALUES ($1, $2, $3, $4)
ON CONFLICT (staging_token, key) DO UPDATE
SET (staging_token, key, identity, data) =
(excluded.staging_token, excluded.key, excluded.identity, excluded.data)`,
st, key, value.Identity, value.Data)
}, p.txOpts(ctx)...)
return err
}

func (p *stagingManager) Delete(ctx context.Context, st StagingToken, key Key) error {
_, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
return tx.Exec("DELETE FROM kv_staging WHERE staging_token=$1 AND key=$2", st, key)
}, p.txOpts(ctx)...)
return err
}

func (p *stagingManager) List(ctx context.Context, st StagingToken) (ValueIterator, error) {
return NewStagingIterator(ctx, p.db, p.log, st), nil
}

func (p *stagingManager) Drop(ctx context.Context, st StagingToken) error {
_, err := p.db.Transact(func(tx db.Tx) (interface{}, error) {
return tx.Exec("DELETE FROM kv_staging WHERE staging_token=$1", st)
}, p.txOpts(ctx)...)
return err
}

func (p *stagingManager) txOpts(ctx context.Context, opts ...db.TxOpt) []db.TxOpt {
o := []db.TxOpt{
db.WithContext(ctx),
db.WithLogger(p.log),
}
return append(o, opts...)
}
87 changes: 87 additions & 0 deletions graveler/staging_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package graveler

import (
"context"

"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)

const batchSize = 1000

type StagingIterator struct {
ctx context.Context
db db.Database
log logging.Logger
st StagingToken

idxInBuffer int
err error
dbHasNext bool
buffer []*ValueRecord
nextFrom Key
}

func NewStagingIterator(ctx context.Context, db db.Database, log logging.Logger, st StagingToken) *StagingIterator {
return &StagingIterator{ctx: ctx, st: st, dbHasNext: true, db: db, log: log, nextFrom: make([]byte, 0)}
}

func (s *StagingIterator) Next() bool {
if s.err != nil {
return false
}
s.idxInBuffer++
if s.idxInBuffer < len(s.buffer) {
return true
}
if !s.dbHasNext {
return false
}
return s.loadBuffer()
}

func (s *StagingIterator) SeekGE(key Key) bool {
s.buffer = nil
s.err = nil
s.idxInBuffer = 0
s.nextFrom = key
s.dbHasNext = true
return s.Next()
}

func (s *StagingIterator) Value() *ValueRecord {
if s.err != nil || s.idxInBuffer >= len(s.buffer) {
return nil
}
return s.buffer[s.idxInBuffer]
}

func (s *StagingIterator) Err() error {
return s.err
}

func (s *StagingIterator) Close() {
}

func (s *StagingIterator) loadBuffer() bool {
queryResult, err := s.db.Transact(func(tx db.Tx) (interface{}, error) {
var res []*ValueRecord
err := tx.Select(&res, "SELECT key, identity, data "+
"FROM kv_staging WHERE staging_token=$1 AND key >= $2 ORDER BY key LIMIT $3", s.st, s.nextFrom, batchSize+1)
return res, err
}, db.WithLogger(s.log), db.WithContext(s.ctx), db.ReadOnly())
if err != nil {
s.err = err
return false
}
values := queryResult.([]*ValueRecord)
s.idxInBuffer = 0
if len(values) == batchSize+1 {
s.nextFrom = values[len(values)-1].Key
s.buffer = values[:len(values)-1]
return true
}
s.dbHasNext = false
s.buffer = values
return len(values) > 0
}
Loading

0 comments on commit 3084355

Please sign in to comment.