diff --git a/lake/journal/store.go b/lake/journal/store.go index 68da6f2bc4..f4964d2341 100644 --- a/lake/journal/store.go +++ b/lake/journal/store.go @@ -27,9 +27,9 @@ var ( ) type Store struct { - journal *Queue - logger *zap.Logger - unmarshaler *zson.UnmarshalZNGContext + journal *Queue + logger *zap.Logger + keyTypes []interface{} mu sync.RWMutex // Protects everything below. table map[string]Entry @@ -74,13 +74,10 @@ func OpenStore(ctx context.Context, engine storage.Engine, logger *zap.Logger, p } func newStore(journal *Queue, logger *zap.Logger, keyTypes ...interface{}) *Store { - u := zson.NewZNGUnmarshaler() - u.Bind(Add{}, Delete{}, Update{}) - u.Bind(keyTypes...) return &Store{ - journal: journal, - logger: logger.Named("journal"), - unmarshaler: u, + journal: journal, + logger: logger.Named("journal"), + keyTypes: append([]interface{}{Add{}, Delete{}, Update{}}, keyTypes...), } } @@ -95,7 +92,9 @@ func (s *Store) load(ctx context.Context) error { if head == current { return nil } - at, table, err := s.getSnapshot(ctx) + unmarshaler := zson.NewZNGUnmarshaler() + unmarshaler.Bind(s.keyTypes...) + at, table, err := s.getSnapshot(ctx, unmarshaler) if err != nil && !errors.Is(err, fs.ErrNotExist) { s.logger.Error("Loading snapshot", zap.Error(err)) } @@ -126,7 +125,7 @@ func (s *Store) load(ctx context.Context) error { return nil } var e Entry - if err := s.unmarshaler.Unmarshal(*val, &e); err != nil { + if err := unmarshaler.Unmarshal(*val, &e); err != nil { return err } switch e := e.(type) { @@ -146,7 +145,7 @@ func (s *Store) load(ctx context.Context) error { } } -func (s *Store) getSnapshot(ctx context.Context) (ID, map[string]Entry, error) { +func (s *Store) getSnapshot(ctx context.Context, unmarshaler *zson.UnmarshalZNGContext) (ID, map[string]Entry, error) { table := make(map[string]Entry) r, err := s.journal.engine.Get(ctx, s.snapshotURI()) if err != nil { @@ -169,7 +168,7 @@ func (s *Store) getSnapshot(ctx context.Context) (ID, map[string]Entry, error) { return at, table, err } var e Entry - if err := s.unmarshaler.Unmarshal(*val, &e); err != nil { + if err := unmarshaler.Unmarshal(*val, &e); err != nil { return at, nil, err } table[e.Key()] = e