Skip to content

Commit

Permalink
feat(discovery_kit_sdk): allow errors from discovery methods
Browse files Browse the repository at this point in the history
  • Loading branch information
joshiste committed Nov 3, 2023
1 parent ee74cbd commit 04dbfde
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 143 deletions.
115 changes: 63 additions & 52 deletions go/discovery_kit_sdk/caching_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,72 @@ import (
"time"
)

type CachingDiscovery[T any] struct {
type CachedDiscovery[T any] struct {
Discovery

mu sync.RWMutex
lastModified time.Time
supplier func(ctx context.Context) []T
supplier func(ctx context.Context) ([]T, error)
data []T
err error
}

type CachingDiscoveryOpt[T any] func(m *CachingDiscovery[T])
type CachedDiscoveryOpt[T any] func(m *CachedDiscovery[T])

type CachingTargetDiscovery struct {
CachingDiscovery[discovery_kit_api.Target]
type CachedTargetDiscovery struct {
CachedDiscovery[discovery_kit_api.Target]
}

type CachingDataEnrichmentDiscovery struct {
CachingDiscovery[discovery_kit_api.EnrichmentData]
type CachedDataEnrichmentDiscovery struct {
CachedDiscovery[discovery_kit_api.EnrichmentData]
}

// CachedTargetDiscovery returns a caching target discovery.
func CachedTargetDiscovery(d TargetDiscovery, opts ...CachingDiscoveryOpt[discovery_kit_api.Target]) *CachingTargetDiscovery {
c := &CachingTargetDiscovery{
CachingDiscovery: CachingDiscovery[discovery_kit_api.Target]{
var (
_ TargetDiscovery = (*CachedTargetDiscovery)(nil)
_ EnrichmentDataDiscovery = (*CachedDataEnrichmentDiscovery)(nil)
)

// NewCachedTargetDiscovery returns a caching target discovery.
func NewCachedTargetDiscovery(d TargetDiscovery, opts ...CachedDiscoveryOpt[discovery_kit_api.Target]) *CachedTargetDiscovery {
c := &CachedTargetDiscovery{
CachedDiscovery: CachedDiscovery[discovery_kit_api.Target]{
Discovery: d,
supplier: recoverable(d.DiscoverTargets),
data: make([]discovery_kit_api.Target, 0),
},
}
for _, opt := range opts {
opt(&c.CachingDiscovery)
opt(&c.CachedDiscovery)
}
return c
}

// CachedEnrichmentDataDiscovery returns a caching enrichment data discovery.
func CachedEnrichmentDataDiscovery(d EnrichmentDataDiscovery, opts ...CachingDiscoveryOpt[discovery_kit_api.EnrichmentData]) *CachingDataEnrichmentDiscovery {
c := &CachingDataEnrichmentDiscovery{
CachingDiscovery: CachingDiscovery[discovery_kit_api.EnrichmentData]{
func (c *CachedTargetDiscovery) DiscoverTargets(_ context.Context) ([]discovery_kit_api.Target, error) {
return c.CachedDiscovery.Get()
}

// NewCachedEnrichmentDataDiscovery returns a caching enrichment data discovery.
func NewCachedEnrichmentDataDiscovery(d EnrichmentDataDiscovery, opts ...CachedDiscoveryOpt[discovery_kit_api.EnrichmentData]) *CachedDataEnrichmentDiscovery {

c := &CachedDataEnrichmentDiscovery{
CachedDiscovery: CachedDiscovery[discovery_kit_api.EnrichmentData]{
Discovery: d,
supplier: recoverable(d.DiscoverEnrichmentData),
data: make([]discovery_kit_api.EnrichmentData, 0),
},
}
for _, opt := range opts {
opt(&c.CachingDiscovery)
opt(&c.CachedDiscovery)
}
return c
}

func recoverable[T any](fn func(ctx context.Context) T) func(ctx context.Context) T {
return func(ctx context.Context) T {
func (c *CachedDataEnrichmentDiscovery) DiscoverEnrichmentData(_ context.Context) ([]discovery_kit_api.EnrichmentData, error) {
return c.CachedDiscovery.Get()
}

func recoverable[T any](fn func(ctx context.Context) (T, error)) func(ctx context.Context) (T, error) {
return func(ctx context.Context) (d T, e error) {
defer func() {
if err := recover(); err != nil {
log.Error().Msgf("discovery panic: %v\n %s", err, string(debug.Stack()))
Expand All @@ -72,75 +87,71 @@ func recoverable[T any](fn func(ctx context.Context) T) func(ctx context.Context
}
}

func (c *CachingTargetDiscovery) DiscoverTargets(_ context.Context) []discovery_kit_api.Target {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data
}

func (c *CachingDataEnrichmentDiscovery) DiscoverEnrichmentData(_ context.Context) []discovery_kit_api.EnrichmentData {
func (c *CachedDiscovery[T]) Get() ([]T, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data
return c.data, c.err
}

func (c *CachingDiscovery[T]) LastModified() time.Time {
func (c *CachedDiscovery[T]) LastModified() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()
return c.lastModified
}

func (c *CachingDiscovery[T]) Unwrap() interface{} {
func (c *CachedDiscovery[T]) Unwrap() interface{} {
return c.Discovery
}

type mapper[U any] func(U) U
type mapper[U any] func(U) (U, error)

func (c *CachingDiscovery[T]) update(fn mapper[[]T]) {
func (c *CachedDiscovery[T]) update(fn mapper[[]T]) {
c.mu.Lock()
defer c.mu.Unlock()
c.lastModified = time.Now()
c.data = fn(c.data)
data, err := fn(c.data)
c.data = data
c.err = err
}

func (c *CachingDiscovery[T]) refresh(ctx context.Context) {
c.update(func(_ []T) []T {
func (c *CachedDiscovery[T]) refresh(ctx context.Context) {
c.update(func(_ []T) ([]T, error) {
return c.supplier(ctx)
})
}

// WithRefreshTargetsNow triggers a refresh of the cache immediately at creation time.
func WithRefreshTargetsNow() CachingDiscoveryOpt[discovery_kit_api.Target] {
func WithRefreshTargetsNow() CachedDiscoveryOpt[discovery_kit_api.Target] {
return WithRefreshNow[discovery_kit_api.Target]()
}

// WithRefreshEnrichmentDataNow triggers a refresh of the cache immediately at creation time.
func WithRefreshEnrichmentDataNow() CachingDiscoveryOpt[discovery_kit_api.EnrichmentData] {
func WithRefreshEnrichmentDataNow() CachedDiscoveryOpt[discovery_kit_api.EnrichmentData] {
return WithRefreshNow[discovery_kit_api.EnrichmentData]()
}

// WithRefreshNow triggers a refresh of the cache immediately at creation time.
func WithRefreshNow[T any]() CachingDiscoveryOpt[T] {
return func(m *CachingDiscovery[T]) {
func WithRefreshNow[T any]() CachedDiscoveryOpt[T] {
return func(m *CachedDiscovery[T]) {
go func() {
m.refresh(context.Background())
}()
}
}

// WithRefreshTargetsTrigger triggers a refresh of the cache when an item on the channel is received and will stop when the context is canceled.
func WithRefreshTargetsTrigger(ctx context.Context, ch <-chan struct{}) CachingDiscoveryOpt[discovery_kit_api.Target] {
func WithRefreshTargetsTrigger(ctx context.Context, ch <-chan struct{}) CachedDiscoveryOpt[discovery_kit_api.Target] {
return WithRefreshTrigger[discovery_kit_api.Target](ctx, ch)
}

// WithRefreshEnrichmentDataTrigger triggers a refresh of the cache when an item on the channel is received and will stop when the context is canceled.
func WithRefreshEnrichmentDataTrigger(ctx context.Context, ch <-chan struct{}) CachingDiscoveryOpt[discovery_kit_api.EnrichmentData] {
func WithRefreshEnrichmentDataTrigger(ctx context.Context, ch <-chan struct{}) CachedDiscoveryOpt[discovery_kit_api.EnrichmentData] {
return WithRefreshTrigger[discovery_kit_api.EnrichmentData](ctx, ch)
}

// WithRefreshTrigger triggers a refresh of the cache when an item on the channel is received and will stop when the context is canceled.
func WithRefreshTrigger[T any](ctx context.Context, ch <-chan struct{}) CachingDiscoveryOpt[T] {
return func(m *CachingDiscovery[T]) {
func WithRefreshTrigger[T any](ctx context.Context, ch <-chan struct{}) CachedDiscoveryOpt[T] {
return func(m *CachedDiscovery[T]) {
go func() {
for {
select {
Expand All @@ -155,18 +166,18 @@ func WithRefreshTrigger[T any](ctx context.Context, ch <-chan struct{}) CachingD
}

// WithRefreshTargetsInterval triggers a refresh of the cache at the given interval and will stop when the context is canceled.
func WithRefreshTargetsInterval(ctx context.Context, interval time.Duration) CachingDiscoveryOpt[discovery_kit_api.Target] {
func WithRefreshTargetsInterval(ctx context.Context, interval time.Duration) CachedDiscoveryOpt[discovery_kit_api.Target] {
return WithRefreshInterval[discovery_kit_api.Target](ctx, interval)
}

// WithRefreshEnrichmentDataInterval triggers a refresh of the cache at the given interval and will stop when the context is canceled.
func WithRefreshEnrichmentDataInterval(ctx context.Context, interval time.Duration) CachingDiscoveryOpt[discovery_kit_api.EnrichmentData] {
func WithRefreshEnrichmentDataInterval(ctx context.Context, interval time.Duration) CachedDiscoveryOpt[discovery_kit_api.EnrichmentData] {
return WithRefreshInterval[discovery_kit_api.EnrichmentData](ctx, interval)
}

// WithRefreshInterval triggers a refresh of the cache at the given interval and will stop when the context is canceled.
func WithRefreshInterval[T any](ctx context.Context, interval time.Duration) CachingDiscoveryOpt[T] {
return func(m *CachingDiscovery[T]) {
func WithRefreshInterval[T any](ctx context.Context, interval time.Duration) CachedDiscoveryOpt[T] {
return func(m *CachedDiscovery[T]) {
go func() {
for {
select {
Expand All @@ -180,27 +191,27 @@ func WithRefreshInterval[T any](ctx context.Context, interval time.Duration) Cac
}
}

type UpdateFunc[D, U any] func(data D, update U) D
type UpdateFunc[D, U any] func(data D, update U) (D, error)

// WithTargetsUpdate triggers an updates the cache using the given function when an item on the channel is received and will stop when the context is canceled.
func WithTargetsUpdate[U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]discovery_kit_api.Target, U]) CachingDiscoveryOpt[discovery_kit_api.Target] {
func WithTargetsUpdate[U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]discovery_kit_api.Target, U]) CachedDiscoveryOpt[discovery_kit_api.Target] {
return WithUpdate[discovery_kit_api.Target, U](ctx, ch, fn)
}

// WithEnrichmentDataUpdate triggers an updates the cache using the given function when an item on the channel is received and will stop when the context is canceled.
func WithEnrichmentDataUpdate[U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]discovery_kit_api.EnrichmentData, U]) CachingDiscoveryOpt[discovery_kit_api.EnrichmentData] {
func WithEnrichmentDataUpdate[U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]discovery_kit_api.EnrichmentData, U]) CachedDiscoveryOpt[discovery_kit_api.EnrichmentData] {
return WithUpdate[discovery_kit_api.EnrichmentData, U](ctx, ch, fn)
}

func WithUpdate[T, U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]T, U]) CachingDiscoveryOpt[T] {
return func(m *CachingDiscovery[T]) {
func WithUpdate[T, U any](ctx context.Context, ch <-chan U, fn UpdateFunc[[]T, U]) CachedDiscoveryOpt[T] {
return func(m *CachedDiscovery[T]) {
go func() {
for {
select {
case <-ctx.Done():
return
case update := <-ch:
m.update(func(data []T) []T {
m.update(func(data []T) ([]T, error) {
return fn(data, update)
})
}
Expand Down
Loading

0 comments on commit 04dbfde

Please sign in to comment.