Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
better concurrency for indexing #22
Browse files Browse the repository at this point in the history
  • Loading branch information
srerickson committed Feb 14, 2023
1 parent c38ea1e commit 2603776
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 92 deletions.
59 changes: 41 additions & 18 deletions cmd/ocfl-index/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ const (
envPath = "OCFL_INDEX_STOREDIR"
envDBFile = "OCFL_INDEX_SQLITE"
envAddr = "OCFL_INDEX_LISTEN"
envConc = "OCFL_INDEX_MAXGOROUTINES"
envScanConc = "OCFL_INDEX_SCANWORKERS" // number of workers for object scan
envParseConc = "OCFL_INDEX_PARSEWORKERS" // numer of workers for parsing inventories

sqliteSettings = "_busy_timeout=10000&_journal=WAL&_sync=NORMAL&cache=shared"
)

type config struct {
Logger logr.Logger

// Server
Addr string // port

// Backend configuration
Driver string // backend driver (supported: "fs", "s3", "azure")
Bucket string // Bucket/Container for s3 of azure fs types
Expand All @@ -40,11 +46,9 @@ type config struct {
// SQLITE file
DBFile string // sqlite file

// Server
Addr string // port

// Concurrency
Conc int
// Concurrency Settings
ScanConc int // number of object scanning workers
ParseConc int // number of inventory parsing workers

IndexLevel index.IndexMode
}
Expand All @@ -60,30 +64,49 @@ func NewLogger() logr.Logger {
return logger
}

func NewConfig(logger logr.Logger) (*config, error) {
c := &config{
func NewConfig(logger logr.Logger) config {
c := config{
Logger: logger,
}
// values from environment variables
c.Bucket = getenvDefault(envBucket, "")
c.Driver = getenvDefault(envDriver, "fs")
c.Path = getenvDefault(envPath, ".")
c.S3Endpoint = getenvDefault(envS3Endpoint, "")
c.DBFile = getenvDefault(envDBFile, "index.sqlite")
c.Addr = getenvDefault(envAddr, ":8080")
conc, err := strconv.Atoi(getenvDefault(envConc, "0"))
if err != nil {
return nil, fmt.Errorf("parsing %s: %w", envConc, err)
if conc, err := strconv.Atoi(getenvDefault(envScanConc, "0")); err == nil {
c.ScanConc = conc
}
if c.ScanConc < 1 {
c.ScanConc = runtime.NumCPU()
}
if conc, err := strconv.Atoi(getenvDefault(envParseConc, "0")); err == nil {
c.ParseConc = conc
}
if c.ParseConc < 1 {
c.ParseConc = runtime.NumCPU()
}
logger.Info("config loaded", c.Attrs()...)
return c
}

func (c config) Attrs() []any {
attrs := []any{
"addr", c.Addr,
"driver", c.Driver,
"bucket", c.Bucket,
"path", c.Path,
"dbfile", c.DBFile,
"scan_workers", c.ScanConc,
"parse_workers", c.ParseConc,
}
// concurrency
c.Conc = conc
if c.Conc < 1 {
c.Conc = runtime.GOMAXPROCS(-1)
if c.S3Endpoint != "" {
attrs = append(attrs, "s3_endpoint", c.S3Endpoint)
}
return c, nil
return attrs
}

func (c *config) FS(ctx context.Context) (ocfl.FS, string, error) {
func (c config) FS(ctx context.Context) (ocfl.FS, string, error) {
switch c.Driver {
case "fs":
return ocfl.NewFS(os.DirFS(c.Path)), ".", nil
Expand Down
19 changes: 4 additions & 15 deletions cmd/ocfl-index/cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
_ "gocloud.dev/blob/azureblob"
)

var indexFlags struct {
conc int
}

// indexCmd represents the index command
var indexCmd = &cobra.Command{
Use: "index",
Expand All @@ -27,11 +23,7 @@ index file will be created if it does not exist.`,
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
logger := NewLogger()
conf, err := NewConfig(logger)
if err != nil {
logger.Error(err, "configuration error")
return
}
conf := NewConfig(logger)
fsys, rootDir, err := conf.FS(ctx)
if err != nil {
logger.Error(err, "can't connect to backend")
Expand All @@ -40,21 +32,18 @@ index file will be created if it does not exist.`,
if closer, ok := fsys.(io.Closer); ok {
defer closer.Close()
}
if err := DoIndex(ctx, conf, fsys, rootDir); err != nil {
if err := DoIndex(ctx, &conf, fsys, rootDir); err != nil {
logger.Error(err, "index failed")
}
},
}

func init() {
rootCmd.AddCommand(indexCmd)
indexCmd.Flags().IntVar(
&indexFlags.conc, "concurrency", 4, "number of concurrent operations duration indexing",
)
}

func DoIndex(ctx context.Context, conf *config, fsys ocfl.FS, rootDir string) error {
idx, err := sqlite.Open(conf.DBFile)
idx, err := sqlite.Open(conf.DBFile + "?" + sqliteSettings)
if err != nil {
return err
}
Expand All @@ -64,6 +53,6 @@ func DoIndex(ctx context.Context, conf *config, fsys ocfl.FS, rootDir string) er
}
return index.NewIndex(
idx, fsys, rootDir,
index.WithConcurrency(conf.Conc),
index.WithObjectScanConc(conf.ScanConc),
index.WithLogger(conf.Logger)).DoIndex(ctx, index.ModeFileSizes)
}
19 changes: 9 additions & 10 deletions cmd/ocfl-index/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ var serveCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
logger := NewLogger()
conf, err := NewConfig(logger)
if err != nil {
logger.Error(err, "configuration error")
return
}
conf := NewConfig(logger)
fsys, rootDir, err := conf.FS(ctx)
if err != nil {
logger.Error(err, "can't connect to backend")
Expand All @@ -43,8 +39,7 @@ var serveCmd = &cobra.Command{
if closer, ok := fsys.(io.Closer); ok {
defer closer.Close()
}
logger.Info("fs settings", "driver", conf.Driver, "bucket", conf.Bucket, "path", conf.Path)
if err := startServer(ctx, conf, fsys, rootDir); err != nil {
if err := startServer(ctx, &conf, fsys, rootDir); err != nil {
logger.Error(err, "server stopped")
}
},
Expand All @@ -54,12 +49,13 @@ func init() {
rootCmd.AddCommand(serveCmd)
serveCmd.Flags().BoolVar(&serverFlags.skipIndexing, "skip-indexing", false, "skip indexing step on startup")
serveCmd.Flags().BoolVar(&serverFlags.filesizes, "filesizes", false, "index file sizes during reindex")
serveCmd.Flags().BoolVar(&serverFlags.inventories, "inventories", false, "index inventories sizes during reindex")
serveCmd.Flags().BoolVar(&serverFlags.inventories, "inventories", false, "index inventories during reindex")

}

func startServer(ctx context.Context, c *config, fsys ocfl.FS, rootDir string) error {
db, err := sqlite.Open("file:" + c.DBFile)

db, err := sqlite.Open("file:" + c.DBFile + "?" + sqliteSettings)
if err != nil {
return err
}
Expand All @@ -73,7 +69,10 @@ func startServer(ctx context.Context, c *config, fsys ocfl.FS, rootDir string) e
defer db.Close()
schemaV := fmt.Sprintf("%d.%d", maj, min)
c.Logger.Info("using index file", "file", c.DBFile, "schema", schemaV)
idx := index.NewIndex(db, fsys, rootDir, index.WithLogger(c.Logger))
idx := index.NewIndex(db, fsys, rootDir,
index.WithLogger(c.Logger),
index.WithObjectScanConc(c.ScanConc),
)
if !serverFlags.skipIndexing {
go func() {
// initial indexing
Expand Down
11 changes: 9 additions & 2 deletions hack/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ export OCFL_INDEX_BACKEND="azure"
export OCFL_INDEX_BUCKET="ocfl"

# path relative to bucket/fs to OCFL storage root
export OCFL_INDEX_STOREDIR="public-data"
export OCFL_INDEX_STOREDIR="faker"

# path to index file
export OCFL_INDEX_SQLITE="public-data.sqlite"
export OCFL_INDEX_SQLITE="faker.sqlite"

# number of go routines for object scan
export OCFL_INDEX_SCANWORKERS=200

# number of go routeins for inventory parse
export OCFL_INDEX_PARSEWORKERS=12


# Additional options:

Expand Down
Loading

0 comments on commit 2603776

Please sign in to comment.