Skip to content

Commit

Permalink
Refactor config: extract block, auth factories
Browse files Browse the repository at this point in the history
config also holds factories.  This creates dependency loops -- which
Go's import cycle rules make hard to break.  Prevent import loops by
splitting packages that need a factory into:
1. The actual package (virtually unchanged, except to take a
   parameters object in its constructor);
1. A "params" (sub-)package, which holds parameter structs that config
   creates and factories consume;
1. A "factory" (sub-)package, which calls the appropriate config
   methods to get parameters and pass them to the constructor.


Former-commit-id: 91351838726ff6ce10b9f6dcd9cd4218306e107a
  • Loading branch information
arielshaqed committed Aug 18, 2020
1 parent e9dc835 commit defa439
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 109 deletions.
10 changes: 10 additions & 0 deletions auth/params/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package params

import "time"

type ServiceCache struct {
Enabled bool
Size int
TTL time.Duration
EvictionJitter time.Duration
}
10 changes: 2 additions & 8 deletions auth/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/treeverse/lakefs/auth/crypt"
"github.com/treeverse/lakefs/auth/model"
"github.com/treeverse/lakefs/auth/params"
"github.com/treeverse/lakefs/auth/wildcard"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
Expand Down Expand Up @@ -134,14 +135,7 @@ type DBAuthService struct {
cache Cache
}

type ServiceCacheConfig struct {
Enabled bool
Size int
TTL time.Duration
EvictionJitter time.Duration
}

func NewDBAuthService(db db.Database, secretStore crypt.SecretStore, cacheConf ServiceCacheConfig) *DBAuthService {
func NewDBAuthService(db db.Database, secretStore crypt.SecretStore, cacheConf params.ServiceCache) *DBAuthService {
logging.Default().Info("initialized Auth service")
var cache Cache
if cacheConf.Enabled {
Expand Down
76 changes: 76 additions & 0 deletions block/factory/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package factory

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/block/local"
"github.com/treeverse/lakefs/block/mem"
"github.com/treeverse/lakefs/block/params"
s3a "github.com/treeverse/lakefs/block/s3"
"github.com/treeverse/lakefs/block/transient"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/logging"
)

var ErrInvalidBlockStoreType = errors.New("invalid blockstore type")

func BuildBlockAdapter(c *config.Config) (block.Adapter, error) {
blockstore := c.GetBlockstoreType()
logging.Default().
WithField("type", blockstore).
Info("initialize blockstore adapter")
switch blockstore {
case local.BlockstoreType:
params, err := c.GetBlockAdapterLocalParams()
if err != nil {
return nil, err
}
return buildLocalAdapter(params)
case s3a.BlockstoreType:
params, err := c.GetBlockAdapterS3Params()
if err != nil {
return nil, err
}
return buildS3Adapter(params)
case mem.BlockstoreType, "memory":
return mem.New(), nil
case transient.BlockstoreType:
return transient.New(), nil
default:
return nil, fmt.Errorf("%w '%s' please choose one of %s",
ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType})
}
}

func buildLocalAdapter(params params.Local) (*local.Adapter, error) {
adapter, err := local.NewAdapter(params.Path)
if err != nil {
return nil, fmt.Errorf("got error opening a local block adapter with path %s: %w", params.Path, err)
}
logging.Default().WithFields(logging.Fields{
"type": "local",
"path": params.Path,
}).Info("initialized blockstore adapter")
return adapter, nil
}

func buildS3Adapter(params params.S3) (*s3a.Adapter, error) {
sess, err := session.NewSession(params.AwsConfig)
if err != nil {
return nil, err
}
sess.ClientConfig(s3.ServiceName)
svc := s3.New(sess)
adapter := s3a.NewAdapter(svc,
s3a.WithStreamingChunkSize(params.StreamingChunkSize),
s3a.WithStreamingChunkTimeout(params.StreamingChunkTimeout),
)
logging.Default().WithFields(logging.Fields{
"type": "s3",
}).Info("initialized blockstore adapter")
return adapter, nil
}
4 changes: 2 additions & 2 deletions block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
ErrInventoryNotSupported = errors.New("inventory feature not implemented for local storage adapter")
)

func (l *Adapter) WithContext(ctx context.Context) block.Adapter {
func (l *Adapter) WithContext(ctx context.Context) *Adapter {
return &Adapter{
path: l.path,
ctx: ctx,
Expand All @@ -50,7 +50,7 @@ func WithTranslator(t block.UploadIDTranslator) func(a *Adapter) {
}
}

func NewAdapter(path string, opts ...func(a *Adapter)) (block.Adapter, error) {
func NewAdapter(path string, opts ...func(a *Adapter)) (*Adapter, error) {
stt, err := os.Stat(path)
if err != nil {
return nil, err
Expand Down
19 changes: 19 additions & 0 deletions block/params/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package params

import (
"time"

"github.com/aws/aws-sdk-go/aws"
)

type Mem struct{}

type Local struct {
Path string
}

type S3 struct {
AwsConfig *aws.Config
StreamingChunkSize int
StreamingChunkTimeout time.Duration
}
2 changes: 1 addition & 1 deletion block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WithTranslator(t block.UploadIDTranslator) func(a *Adapter) {
}
}

func NewAdapter(s3 s3iface.S3API, opts ...func(a *Adapter)) block.Adapter {
func NewAdapter(s3 s3iface.S3API, opts ...func(a *Adapter)) *Adapter {
a := &Adapter{
s3: s3,
httpClient: http.DefaultClient,
Expand Down
3 changes: 2 additions & 1 deletion cmd/lakefs/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)

Expand All @@ -20,7 +21,7 @@ var diagnoseCmd = &cobra.Command{
if err != nil {
logger.WithError(err).Fatal("Failed to create block adapter")
}
dbPool := cfg.BuildDatabaseConnection()
dbPool := db.BuildDatabaseConnection(cfg)
cataloger := catalog.NewCataloger(dbPool)

numFailures := 0
Expand Down
126 changes: 29 additions & 97 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,19 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/google/uuid"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/mitchellh/go-homedir"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/treeverse/lakefs/auth"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/block/gcs"
"github.com/treeverse/lakefs/block/local"
"github.com/treeverse/lakefs/block/mem"
s3a "github.com/treeverse/lakefs/block/s3"
"github.com/treeverse/lakefs/block/transient"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"

auth_params "github.com/treeverse/lakefs/auth/params"
block_params "github.com/treeverse/lakefs/block/params"
"github.com/treeverse/lakefs/stats"
)

const (
DefaultDatabaseDriver = "pgx"
DefaultDatabaseConnString = "postgres://localhost:5432/postgres?sslmode=disable"

DefaultBlockStoreType = "local"
Expand Down Expand Up @@ -62,8 +54,7 @@ const (
)

var (
ErrInvalidBlockStoreType = errors.New("invalid blockstore type")
ErrMissingSecretKey = errors.New("auth.encrypt.secret_key cannot be empty")
ErrMissingSecretKey = errors.New("auth.encrypt.secret_key cannot be empty")
)

type LogrusAWSAdapter struct {
Expand Down Expand Up @@ -118,14 +109,6 @@ func (c *Config) GetDatabaseURI() string {
return viper.GetString("database.connection_string")
}

func (c *Config) BuildDatabaseConnection() db.Database {
database, err := db.ConnectDB(DefaultDatabaseDriver, c.GetDatabaseURI())
if err != nil {
panic(err)
}
return database
}

type AwsS3RetentionConfig struct {
RoleArn string
ManifestBaseURL *url.URL
Expand Down Expand Up @@ -174,6 +157,16 @@ func (c *Config) GetAwsConfig() *aws.Config {
viper.GetString("blockstore.s3.credentials.access_secret_key"),
viper.GetString("blockstore.s3.credentials.session_token"))
}

s3Endpoint := viper.GetString("blockstore.s3.endpoint")
if len(s3Endpoint) > 0 {
awsConfig = awsConfig.WithEndpoint(s3Endpoint)
}
s3ForcePathStyle := viper.GetBool("blockstore.s3.force_path_style")
if s3ForcePathStyle {
awsConfig = awsConfig.WithS3ForcePathStyle(true)
}

return cfg
}

Expand Down Expand Up @@ -225,93 +218,32 @@ func GetAccount(awsConfig *aws.Config) (string, error) {
return *account.Account, nil
}

func (c *Config) buildS3Adapter() (block.Adapter, error) {
cfg := c.GetAwsConfig()

sess, err := session.NewSession(cfg)
if err != nil {
return nil, err
}
sess.ClientConfig(s3.ServiceName)

awsConfig := aws.NewConfig()
s3Endpoint := viper.GetString("blockstore.s3.endpoint")
if len(s3Endpoint) > 0 {
awsConfig = awsConfig.WithEndpoint(s3Endpoint)
}
s3ForcePathStyle := viper.GetBool("blockstore.s3.force_path_style")
if s3ForcePathStyle {
awsConfig = awsConfig.WithS3ForcePathStyle(true)
}

svc := s3.New(sess, awsConfig)
adapter := s3a.NewAdapter(svc,
s3a.WithStreamingChunkSize(viper.GetInt("blockstore.s3.streaming_chunk_size")),
s3a.WithStreamingChunkTimeout(viper.GetDuration("blockstore.s3.streaming_chunk_timeout")))
log.WithFields(log.Fields{
"type": "s3",
}).Info("initialized blockstore adapter")
return adapter, nil
func (c *Config) GetBlockstoreType() string {
return viper.GetString("blockstore.type")
}

func (c *Config) buildGCSAdapter() (block.Adapter, error) {
cfg := c.GetGCSAwsConfig()
s3Endpoint := viper.GetString("blockstore.gcs.s3_endpoint")
sess, err := session.NewSession(cfg)
if err != nil {
return nil, err
}
sess.ClientConfig(s3.ServiceName)
svc := s3.New(sess, aws.NewConfig().WithEndpoint(s3Endpoint))
adapter := gcs.NewAdapter(svc,
gcs.WithStreamingChunkSize(viper.GetInt("blockstore.gcs.streaming_chunk_size")),
gcs.WithStreamingChunkTimeout(viper.GetDuration("blockstore.gcs.streaming_chunk_timeout")))
log.WithFields(log.Fields{"type": "gcs"}).Info("initialized blockstore adapter")
return adapter, nil
func (c *Config) GetBlockAdapterS3Params() (block_params.S3, error) {
cfg := c.GetAwsConfig()

return block_params.S3{
AwsConfig: cfg,
StreamingChunkSize: viper.GetInt("blockstore.s3.streaming_chunk_size"),
StreamingChunkTimeout: viper.GetDuration("blockstore.s3.streaming_chunk_timeout"),
}, nil
}

func (c *Config) buildLocalAdapter() (block.Adapter, error) {
func (c *Config) GetBlockAdapterLocalParams() (block_params.Local, error) {
localPath := viper.GetString("blockstore.local.path")
location, err := homedir.Expand(localPath)
path, err := homedir.Expand(localPath)
if err != nil {
return nil, fmt.Errorf("could not parse blockstore location URI: %w", err)
}

adapter, err := local.NewAdapter(location)
if err != nil {
return nil, fmt.Errorf("got error opening a local block adapter with path %s: %w", location, err)
}
log.WithFields(log.Fields{
"type": "local",
"path": location,
}).Info("initialized blockstore adapter")
return adapter, nil
}

func (c *Config) BuildBlockAdapter() (block.Adapter, error) {
blockstore := viper.GetString("blockstore.type")
logging.Default().
WithField("type", blockstore).
Info("initialize blockstore adapter")
switch blockstore {
case local.BlockstoreType:
return c.buildLocalAdapter()
case s3a.BlockstoreType:
return c.buildS3Adapter()
case mem.BlockstoreType, "memory":
return mem.New(), nil
case transient.BlockstoreType:
return transient.New(), nil
case gcs.BlockstoreType:
return c.buildGCSAdapter()
default:
return nil, fmt.Errorf("%w '%s' please choose one of %s",
ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType, gcs.BlockstoreType})
}
return block_params.Local{Path: path}, err
}

func (c *Config) GetAuthCacheConfig() auth.ServiceCacheConfig {
return auth.ServiceCacheConfig{
func (c *Config) GetAuthCacheConfig() auth_params.ServiceCache {
return auth_params.ServiceCache{
Enabled: viper.GetBool("auth.cache.enabled"),
Size: viper.GetInt("auth.cache.size"),
TTL: viper.GetDuration("auth.cache.ttl"),
Expand Down
13 changes: 13 additions & 0 deletions db/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/treeverse/lakefs/db/params"

"github.com/jmoiron/sqlx"
"github.com/treeverse/lakefs/logging"
)
Expand All @@ -12,8 +14,19 @@ const (
DefaultMaxOpenConnections = 25
DefaultMaxIdleConnections = 25
DefaultConnectionMaxLifetime = 5 * time.Minute
DatabaseDriver = "pgx"
)

// BuildDatabaseConnection returns a database connection based on a pool for the configuration
// in c.
func BuildDatabaseConnection(params *params.Database) Database {
database, err := ConnectDB(DatabaseDriver, params.DatabaseURI)
if err != nil {
panic(err)
}
return database
}

func ConnectDB(driver string, uri string) (Database, error) {
log := logging.Default().WithFields(logging.Fields{
"driver": driver,
Expand Down
5 changes: 5 additions & 0 deletions db/params/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package params

type Database struct {
DatabaseURI string
}

0 comments on commit defa439

Please sign in to comment.