Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve watcher creation pattern and manager naming #4713

Merged
merged 9 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions pkg/lib/watcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The Watcher Library is an internal component of the Bacalhau project that provid

## Key Components

1. **Registry**: Manages multiple watchers and provides methods to create and manage watchers.
1. **Manager**: Manages multiple watchers and provides methods to create, lookup, and stop watchers.
2. **Watcher**: Represents a single event watcher that processes events sequentially.
3. **EventStore**: Responsible for storing and retrieving events, with BoltDB as the default implementation.
4. **EventHandler**: Interface for handling individual events.
Expand All @@ -42,9 +42,9 @@ An `Event` represents a single occurrence in the system. It has the following pr

The `EventStore` is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.

### Registry
### Manager

The `Registry` manages multiple watchers. It's the main entry point for components that want to subscribe to events.
The `Manager` manages multiple watchers and provides methods to create, lookup, and stop watchers.

### Watcher

Expand All @@ -70,9 +70,9 @@ db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
```

2. Create a Registry:
2. Create a manager:
```go
registry := watcher.NewRegistry(store)
manager := watcher.NewManager(store)
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
```

3. Implement an EventHandler:
Expand All @@ -86,9 +86,32 @@ func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error
```


4. Start watching for events:
4. Create a watcher and set handler:

There are two main approaches to create and configure a watcher with a handler:

a. Two-Step Creation (Handler After Creation):
```go
// Create watcher
w, _ := manager.Watch(ctx, "my-watcher",
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
}),
)

// Set handler
err = w.SetHandler(&MyHandler{})

// Start watching
err = w.Start(ctx)
```

b. One-Step Creation (With Auto-Start):
```go
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{},
w, _ := manager.Create(ctx, "my-watcher",
watcher.WithHandler(&MyHandler{}),
watcher.WithAutoStart(),
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
Expand All @@ -109,6 +132,8 @@ store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)
When creating a watcher, you can configure it with various options:

- `WithInitialEventIterator(iterator EventIterator)`: Sets the starting position for watching if no checkpoint is found.
- `WithHandler(handler EventHandler)`: Sets the event handler for the watcher.
- `WithAutoStart()`: Enables automatic start of the watcher after creation.
- `WithFilter(filter EventFilter)`: Sets the event filter for watching.
- `WithBufferSize(size int)`: Sets the size of the event buffer.
- `WithBatchSize(size int)`: Sets the number of events to fetch in each batch.
Expand All @@ -120,8 +145,10 @@ When creating a watcher, you can configure it with various options:
Example:

```go
watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
w, err := manager.Create(ctx, "my-watcher",
watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
watcher.WithHandler(&MyHandler{}),
watcher.WithAutoStart(),
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
Expand Down
2 changes: 2 additions & 0 deletions pkg/lib/watcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ var (
ErrWatcherAlreadyExists = errors.New("watcher already exists")
ErrWatcherNotFound = errors.New("watcher not found")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrNoHandler = errors.New("no handler configured")
ErrHandlerExists = errors.New("handler already exists")
)

// WatcherError represents an error related to a specific watcher
Expand Down
124 changes: 124 additions & 0 deletions pkg/lib/watcher/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package watcher

import (
"context"
"sync"
"time"

"github.com/rs/zerolog/log"
)

const (
DefaultShutdownTimeout = 30 * time.Second
)

// manager handles lifecycle of multiple watchers with shared resources
type manager struct {
store EventStore
watchers map[string]Watcher
mu sync.RWMutex
}

// NewManager creates a new Manager with the given EventStore.
//
// Example usage:
//
// store := // initialize your event store
// manager := NewManager(store)
// defer manager.Stop(context.Background())
//
// watcher, err := manager.Create(context.Background(), "myWatcher")
// if err != nil {
// // handle error
// }
func NewManager(store EventStore) Manager {
return &manager{
store: store,
watchers: make(map[string]Watcher),
}
}

// Create creates an unstarted watcher. SetHandler must be called before
// Start can be called successfully.
func (m *manager) Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error) {
m.mu.Lock()
defer m.mu.Unlock()

// Check if a watcher with this ID already exists
if _, exists := m.watchers[watcherID]; exists {
return nil, NewWatcherError(watcherID, ErrWatcherAlreadyExists)
}

w, err := New(ctx, watcherID, m.store, opts...)
if err != nil {
return nil, err
}

m.watchers[w.ID()] = w
return w, nil
}

// Lookup retrieves a specific watcher by ID
func (m *manager) Lookup(watcherID string) (Watcher, error) {
m.mu.RLock()
defer m.mu.RUnlock()

w, exists := m.watchers[watcherID]
if !exists {
return nil, NewWatcherError(watcherID, ErrWatcherNotFound)
}

return w, nil
}

// Stop gracefully shuts down the manager and all its watchers
func (m *manager) Stop(ctx context.Context) error {
log.Ctx(ctx).Debug().Msg("Shutting down manager")

// Create a timeout context if the parent context doesn't have a deadline
timeoutCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
timeoutCtx, cancel = context.WithTimeout(ctx, DefaultShutdownTimeout)
defer cancel()
}

var wg sync.WaitGroup

// Take a snapshot of watchers under lock
m.mu.RLock()
watchers := make([]Watcher, 0, len(m.watchers))
for _, w := range m.watchers {
watchers = append(watchers, w)
}
m.mu.RUnlock()

// Stop all watchers concurrently
for i := range watchers {
w := watchers[i]
wg.Add(1)
go func(w Watcher) {
defer wg.Done()
w.Stop(timeoutCtx)
}(w)
}

// Wait for completion or timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
log.Ctx(ctx).Debug().Msg("manager shutdown complete")
return nil
case <-timeoutCtx.Done():
log.Ctx(ctx).Warn().Msg("manager shutdown timed out")
return timeoutCtx.Err()
}
}

// compile time check for interface implementation
var _ Manager = &manager{}
Loading
Loading