Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
updates to match changes in ocfl package
Browse files Browse the repository at this point in the history
  • Loading branch information
srerickson committed Jul 7, 2023
1 parent 8bb8edd commit 00e0335
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 136 deletions.
16 changes: 5 additions & 11 deletions cmd/ocfl-index/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/go-logr/logr"
"github.com/iand/logfmtr"
"github.com/srerickson/ocfl"
"github.com/srerickson/ocfl-index/internal/index"
"github.com/srerickson/ocfl/backend/cloud"
"gocloud.dev/blob"
"gocloud.dev/blob/s3blob"
"golang.org/x/exp/slog"
)

const (
Expand All @@ -32,7 +31,7 @@ const (
)

type config struct {
Logger logr.Logger
Logger *slog.Logger

// Server
Addr string // port
Expand All @@ -51,18 +50,13 @@ type config struct {
ParseConc int // number of inventory parsing workers
}

func NewLogger() logr.Logger {
logger := logfmtr.NewWithOptions(logfmtr.Options{
Writer: os.Stderr,
Humanize: true,
NameDelim: "/",
})
logfmtr.SetVerbosity(verbosity)
func NewLogger() *slog.Logger {
logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{}))
logger.Info("ocfl-index", "version", index.Version, "verbosity", verbosity)
return logger
}

func NewConfig(logger logr.Logger) config {
func NewConfig(logger *slog.Logger) config {
c := config{
Logger: logger,
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/ocfl-index/cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ index file will be created if it does not exist.`,
conf := NewConfig(logger)
fsys, rootDir, err := conf.FS(ctx)
if err != nil {
logger.Error(err, "can't connect to backend")
logger.Error("can't connect to backend", "err", err)
return
}
if closer, ok := fsys.(io.Closer); ok {
defer closer.Close()
}
if err := DoIndex(ctx, &conf, fsys, rootDir); err != nil {
logger.Error(err, "index failed")
logger.Error("index failed", "err", err)
}
},
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/ocfl-index/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ var serveCmd = &cobra.Command{
conf := NewConfig(logger)
fsys, rootDir, err := conf.FS(ctx)
if err != nil {
logger.Error(err, "can't connect to backend")
logger.Error("can't connect to backend", "err", err)
return
}
if closer, ok := fsys.(io.Closer); ok {
defer closer.Close()
}
if err := startServer(ctx, &conf, fsys, rootDir); err != nil {
logger.Error(err, "server stopped")
logger.Error("server stopped", "err", err)
}
},
}
Expand Down
94 changes: 17 additions & 77 deletions internal/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ import (
"context"
"errors"
"fmt"
"io"
"path"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/srerickson/ocfl"
"github.com/srerickson/ocfl-index/internal/pipeline"
"github.com/srerickson/ocfl/backend/cloud"
"github.com/srerickson/ocfl/logging"
"github.com/srerickson/ocfl/ocflv1"
"gocloud.dev/blob"
"golang.org/x/exp/slog"
)

const txCapInv = 10 // number of inventory inserts per transaction
Expand All @@ -30,15 +28,15 @@ type IndexOptions struct {
RootPath string // storage root directory
ScanConc int // concurrency for readdir-based object scanning
ParseConc int // concurrency for inventory parsers
Log logr.Logger
Log *slog.Logger
ObjectIDs []string // index specific object ids only
ObjectPaths []string // index specific object root paths only
}

// Index is updates the index database
func (idx *Indexer) Index(ctx context.Context, opts *IndexOptions) error {
if opts.Log.GetSink() == nil {
opts.Log = logr.Discard()
if opts.Log == nil {
opts.Log = logging.DisabledLogger()
}
if len(opts.ObjectPaths)+len(opts.ObjectIDs) == 0 {
// reindex everything
Expand All @@ -57,21 +55,13 @@ func (idx *Indexer) Index(ctx context.Context, opts *IndexOptions) error {
// object roots in the index that no longer exist in the storage root.
func (idx *Indexer) syncObjectRoots(ctx context.Context, opts *IndexOptions) error {
count := 0
method := "default"
var err error
opts.Log.Info("updating object paths from storage root. This may take a while ...", "root", opts.RootPath)
defer func() {
opts.Log.Info("object path update complete", "object_roots", count, "method", method, "root", opts.RootPath)
opts.Log.Info("object path update complete", "object_roots", count, "root", opts.RootPath)
}()
startSync := time.Now()
switch fsys := opts.FS.(type) {
case *cloud.FS:
method = "list-keys"
count, err = cloudSyncObjecRoots(ctx, idx.Backend, fsys, opts.RootPath)
default:
method = "default"
count, err = defaultSyncObjecRoots(ctx, idx.Backend, opts.FS, opts.RootPath, opts.ScanConc)
}
count, err = syncObjecRootsTX(ctx, idx.Backend, opts.FS, opts.RootPath, opts.ScanConc)
if err != nil {
return err
}
Expand All @@ -86,21 +76,16 @@ func (idx *Indexer) syncObjectRoots(ctx context.Context, opts *IndexOptions) err
return tx.Commit()
}

func defaultSyncObjecRoots(ctx context.Context, db Backend, fsys ocfl.FS, root string, conc int) (int, error) {
store, err := ocflv1.GetStore(ctx, fsys, root)
if err != nil {
return 0, err
}
func syncObjecRootsTX(ctx context.Context, db Backend, fsys ocfl.FS, root string, conc int) (int, error) {
tx, err := db.NewTx(ctx)
if err != nil {
return 0, err
}
defer tx.Rollback()
found := 0
eachObj := func(obj *ocflv1.Object) error {
_, root := obj.Root()
eachObj := func(obj *ocfl.ObjectRoot) error {
// The indexed object root path should be relatvive to the storage root
r := ObjectRoot{Path: strings.TrimPrefix(root, root+"/")}
r := ObjectRoot{Path: strings.TrimPrefix(obj.Path, root+"/")}
if err := tx.IndexObjectRoot(ctx, time.Now(), r); err != nil {
return err
}
Expand All @@ -117,60 +102,15 @@ func defaultSyncObjecRoots(ctx context.Context, db Backend, fsys ocfl.FS, root s
}
return nil
}
if err := store.ScanObjects(ctx, eachObj, &ocflv1.ScanObjectsOpts{Concurrency: conc}); err != nil {
return 0, err
pth := ocfl.PathSelector{
Dir: root,
SkipDirFn: func(name string) bool { return name == path.Join(root, "extensions") },
}
if err := tx.Commit(); err != nil {
if err := ocfl.ObjectRoots(ctx, fsys, pth, eachObj); err != nil {
return 0, err
}
return found, nil
}

func cloudSyncObjecRoots(ctx context.Context, db Backend, fsys *cloud.FS, root string) (int, error) {
iter := fsys.List(&blob.ListOptions{
Prefix: root,
})
tx, err := db.NewTx(ctx)
if err != nil {
return 0, err
}
defer tx.Rollback()
found := 0
var decl ocfl.Declaration
for {
item, err := iter.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return 0, err
}
if err := ocfl.ParseDeclaration(path.Base(item.Key), &decl); err != nil {
continue
}
if decl.Type != ocfl.DeclObject {
continue
}
// item key's directory is an object root: index the path relative to the
// storage root.
objRoot := strings.TrimPrefix(path.Dir(item.Key), root+"/")
if err := tx.IndexObjectRoot(ctx, time.Now(), ObjectRoot{Path: objRoot}); err != nil {
return 0, err
}
found++
if found%txCapObjRoot == 0 {
// commit and start a new transaction
if err := tx.Commit(); err != nil {
return 0, err
}
tx, err = db.NewTx(ctx)
if err != nil {
return found, err
}
}
}
if err := tx.Commit(); err != nil {
return found, err
return 0, err
}
return found, nil
}
Expand Down Expand Up @@ -264,14 +204,14 @@ func (idx *Indexer) indexInventories(ctx context.Context, opts *IndexOptions) er
if !indexingAll {
return job.err
}
opts.Log.Error(job.err, "object has errors", "object_path", root)
opts.Log.Error("object has errors", "err", job.err, "object_path", root)
}
if job.inv == nil {
// nothing to do
return nil
}
if job.prev != nil && job.sidecar != "" && job.prev.InventoryDigest == job.sidecar {
opts.Log.V(10).Info("object is unchanged", "object_path", root)
opts.Log.Debug("object is unchanged", "object_path", root)
return nil
}
numObjs++
Expand Down
5 changes: 2 additions & 3 deletions internal/index/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"path/filepath"

"github.com/go-logr/logr"
"github.com/srerickson/ocfl-index/internal/index"
"github.com/srerickson/ocfl-index/internal/sqlite"
"github.com/srerickson/ocfl/backend/cloud"
"github.com/srerickson/ocfl/logging"
"gocloud.dev/blob/fileblob"
_ "modernc.org/sqlite"
)
Expand Down Expand Up @@ -45,13 +45,12 @@ func newTestService(ctx context.Context, fixture string) (*index.Service, error)
Index: idx,
FS: fsys,
RootPath: fixture,
Log: logr.Discard(),
Log: logging.DisabledLogger(),
Async: index.NewAsync(ctx),
}
opts := &index.IndexOptions{
FS: fsys,
RootPath: fixture,
Log: logr.Discard(),
}
if err := srv.Index.Index(ctx, opts); err != nil {
return nil, err
Expand Down
48 changes: 22 additions & 26 deletions internal/index/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"path"
"time"

"github.com/bufbuild/connect-go"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/go-logr/logr"
"github.com/go-logr/stdr"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/srerickson/ocfl"
Expand All @@ -27,7 +25,7 @@ const downloadPrefix = "/download"

// Service implements the gRPC services
type Service struct {
Log logr.Logger
Log *slog.Logger
FS ocfl.FS
RootPath string
Index *Indexer
Expand All @@ -40,17 +38,16 @@ type Service struct {
var _ (ocflv1connect.IndexServiceHandler) = (*Service)(nil)

func (srv Service) IndexAll(ctx context.Context, rq *connect.Request[api.IndexAllRequest]) (*connect.Response[api.IndexAllResponse], error) {
opts := &IndexOptions{
FS: srv.FS,
RootPath: srv.RootPath,
ParseConc: srv.ParseConc,
ScanConc: srv.ScanConc,
}
task := func(ctx context.Context, w io.Writer) error {
opts.Log = stdr.New(log.New(w, "", 0))
added, _ := srv.Async.TryNow("indexing", func(ctx context.Context, w io.Writer) error {
opts := &IndexOptions{
FS: srv.FS,
RootPath: srv.RootPath,
ParseConc: srv.ParseConc,
ScanConc: srv.ScanConc,
Log: slog.New(slog.NewJSONHandler(w, &slog.HandlerOptions{})),
}
return srv.Index.Index(ctx, opts)
}
added, _ := srv.Async.TryNow("indexing", task)
})
if !added {
return nil, errors.New("an indexing task is already running")
}
Expand All @@ -59,18 +56,17 @@ func (srv Service) IndexAll(ctx context.Context, rq *connect.Request[api.IndexAl

func (srv Service) IndexIDs(ctx context.Context, rq *connect.Request[api.IndexIDsRequest]) (*connect.Response[api.IndexIDsResponse], error) {
// todo check max number of ids
opts := &IndexOptions{
FS: srv.FS,
RootPath: srv.RootPath,
ParseConc: srv.ParseConc,
ScanConc: srv.ScanConc,
ObjectIDs: rq.Msg.ObjectIds,
}
task := func(ctx context.Context, w io.Writer) error {
opts.Log = stdr.New(log.New(w, "", 0))
added, taskErr := srv.Async.TryNow("indexing", func(ctx context.Context, w io.Writer) error {
opts := &IndexOptions{
FS: srv.FS,
RootPath: srv.RootPath,
ParseConc: srv.ParseConc,
ScanConc: srv.ScanConc,
ObjectIDs: rq.Msg.ObjectIds,
Log: slog.New(slog.NewJSONHandler(w, &slog.HandlerOptions{})),
}
return srv.Index.Index(ctx, opts)
}
added, taskErr := srv.Async.TryNow("indexing", task)
})
if !added {
return nil, errors.New("an indexing task is already running")
}
Expand Down Expand Up @@ -132,7 +128,7 @@ func (srv Service) HTTPHandler() http.Handler {
return mux
}

func RequestLogger(logger logr.Logger) func(http.Handler) http.Handler {
func RequestLogger(logger *slog.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
Expand Down
Loading

0 comments on commit 00e0335

Please sign in to comment.