Skip to content

Commit

Permalink
feat: support local source mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Jun 27, 2023
1 parent 7c3ea8e commit 36826e0
Show file tree
Hide file tree
Showing 18 changed files with 1,626 additions and 92 deletions.
16 changes: 12 additions & 4 deletions cmd/fidgit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.flipt.io/fidgit"
"go.flipt.io/fidgit/collections/flipt"
"go.flipt.io/fidgit/internal/source/local"
"golang.org/x/exp/slog"
)

Expand All @@ -16,17 +17,24 @@ func main() {
}))
slog.SetDefault(logger)

server := fidgit.NewServer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

collection, err := fidgit.CollectionFor[*flipt.Flag](context.Background(), &flipt.FlagCollectionFactory{})
manager := fidgit.NewService(local.New(ctx, "."))

collection, err := fidgit.CollectionFor[flipt.Flag](context.Background(), &flipt.FlagCollectionFactory{})
if err != nil {
slog.Error("Building Collection", "error", err)
os.Exit(1)
}

server.RegisterCollection(collection)
manager.RegisterCollection(collection)

manager.Start(context.Background())

server := fidgit.NewServer(manager)

http.Handle("/", server)
http.Handle("/api/v1/", server)

slog.Info("Listening", slog.String("addr", ":9191"))

Expand Down
70 changes: 34 additions & 36 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
"path"
"sync"
Expand All @@ -24,25 +23,22 @@ type Item interface {
GetNamespace() Namespace
GetID() ID
GetTags() []Tag
// GetInternalContext is used by the runtime implementation
// to optimize lookups and will not intended
// for external consumption.
GetInternalContext() map[string]string
}

type Type struct {
Group string
Kind string
Version string
}

func (t Type) String() string {
return path.Join(t.Kind, t.Version)
return path.Join(t.Group, t.Kind, t.Version)
}

type CollectionRuntime[I Item] interface {
ListAll(context.Context) ([]I, error)
Put(context.Context, I) error
Delete(context.Context, Namespace, ID) error
type Runtime[I Item] interface {
ListAll(context.Context) ([]*I, error)
Put(context.Context, Namespace, *I) ([]File, error)
Delete(context.Context, Namespace, ID) ([]File, error)
}

type namespace struct {
Expand All @@ -57,23 +53,24 @@ type Collection struct {

mu sync.RWMutex
updateSnapshot func(context.Context, fs.FS) error
index map[Namespace]namespace
put func(context.Context, []byte) error
del func(context.Context, Namespace, ID) error
index map[Namespace]*namespace
put func(context.Context, Namespace, []byte) ([]File, error)
del func(context.Context, Namespace, ID) ([]File, error)
}

type CollectionFactory[I Item] interface {
type RuntimeFactory[I Item] interface {
GetType() Type
GetTagKeys() []string
CollectionFor(context.Context, fs.FS) (CollectionRuntime[I], error)
CollectionFor(context.Context, fs.FS) (Runtime[I], error)
}

func CollectionFor[I Item](ctx context.Context, f CollectionFactory[I]) (*Collection, error) {
func CollectionFor[I Item](ctx context.Context, f RuntimeFactory[I]) (*Collection, error) {
collection := Collection{
typ: f.GetType(),
tagKeys: f.GetTagKeys(),
logger: slog.With(
slog.String("system", "collection"),
slog.String("group", f.GetType().Group),
slog.String("kind", f.GetType().Kind),
slog.String("version", f.GetType().Version),
),
Expand All @@ -90,45 +87,47 @@ func CollectionFor[I Item](ctx context.Context, f CollectionFactory[I]) (*Collec
return err
}

index := map[Namespace]namespace{}
index := map[Namespace]*namespace{}
for _, item := range all {
raw, err := json.Marshal(item)
if err != nil {
return err
}

ns, ok := index[item.GetNamespace()]
ns, ok := index[(*item).GetNamespace()]
if !ok {
ns := namespace{
ns = &namespace{
index: map[ID]json.RawMessage{},
}
index[item.GetNamespace()] = ns
index[(*item).GetNamespace()] = ns
}

ns.entries = append(ns.entries, raw)
ns.index[item.GetID()] = raw
ns.index[(*item).GetID()] = raw
}

collection.mu.Lock()
defer collection.mu.Unlock()

collection.index = index
collection.del = r.Delete
collection.put = func(ctx context.Context, b []byte) error {
collection.put = func(ctx context.Context, n Namespace, b []byte) ([]File, error) {
var i I
if err := json.Unmarshal(b, &i); err != nil {
return err
return nil, fmt.Errorf("putting item: %w", err)
}

collection.logger.Debug("Put",
slog.String("namespace", string(i.GetNamespace())),
slog.String("id", string(i.GetID())))

if err := r.Put(ctx, i); err != nil {
changes, err := r.Put(ctx, n, &i)
if err != nil {
n, id := i.GetNamespace(), i.GetID()
return fmt.Errorf("%s: item %s/%s: %w", f.GetType(), n, id, err)
return nil, fmt.Errorf("%s: item %s/%s: %w", f.GetType(), n, id, err)
}
return nil

return changes, nil
}

return nil
Expand All @@ -137,7 +136,7 @@ func CollectionFor[I Item](ctx context.Context, f CollectionFactory[I]) (*Collec
return &collection, nil
}

func (c *Collection) Get(ctx context.Context, n Namespace, id ID, w io.Writer) error {
func (c *Collection) Get(ctx context.Context, n Namespace, id ID) ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()

Expand All @@ -147,36 +146,35 @@ func (c *Collection) Get(ctx context.Context, n Namespace, id ID, w io.Writer) e

if ns, ok := c.index[n]; ok {
if entry, ok := ns.index[id]; ok {
_, err := w.Write(entry)
return err
return entry, nil
}
}

return fmt.Errorf("%s: item %s/%s: not found", c.typ, n, id)
return nil, fmt.Errorf("%s: item %s/%s: not found", c.typ, n, id)
}

func (c *Collection) List(ctx context.Context, n Namespace, w io.Writer) error {
func (c *Collection) List(ctx context.Context, n Namespace) ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()

c.logger.Debug("List",
slog.String("namespace", string(n)))

if ns, ok := c.index[n]; ok {
return json.NewEncoder(w).Encode(ns.entries)
return json.Marshal(ns.entries)
}

return fmt.Errorf("%s: namespace %s: not found", c.typ, n)
return nil, fmt.Errorf("%s: namespace %s: not found", c.typ, n)
}

func (c *Collection) Put(ctx context.Context, item []byte) error {
func (c *Collection) Put(ctx context.Context, n Namespace, item []byte) ([]File, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.put(ctx, item)
return c.put(ctx, n, item)
}

func (c *Collection) Delete(ctx context.Context, n Namespace, id ID) error {
func (c *Collection) Delete(ctx context.Context, n Namespace, id ID) ([]File, error) {
c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
Loading

0 comments on commit 36826e0

Please sign in to comment.