Skip to content

Commit

Permalink
ETCD Client: Retry all too many requests errors
Browse files Browse the repository at this point in the history
Update internal/client to be a generic wrapper around a real etcd
client. This internal client will retry `too many requests` errors until
successful, the given context is canceled, or an error occurs.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed May 9, 2024
1 parent 58528af commit fd1fdaa
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 196 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ linters:
- gci
- funlen
- maintidx
linters-settings:
goimports:
local-prefixes: github.com/diagridio
11 changes: 6 additions & 5 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"fmt"
"strings"

"github.com/diagridio/go-etcd-cron/api"
"google.golang.org/protobuf/proto"

"github.com/diagridio/go-etcd-cron/api"
)

// Add adds a new cron job to the cron instance.
Expand Down Expand Up @@ -44,7 +45,6 @@ func (c *cron) Add(ctx context.Context, name string, job *api.Job) error {
}

_, err = c.client.Put(ctx, c.key.JobKey(name), string(b))

return err
}

Expand Down Expand Up @@ -94,10 +94,11 @@ func (c *cron) Delete(ctx context.Context, name string) error {
return err
}

qerr := c.queue.Dequeue(name)
_, err := c.client.Delete(ctx, c.key.JobKey(name))
if _, err := c.client.Delete(ctx, c.key.JobKey(name)); err != nil {
return err
}

return errors.Join(err, qerr)
return c.queue.Dequeue(name)
}

// validateName validates the name of a job.
Expand Down
22 changes: 11 additions & 11 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
func Test_CRUD(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_Add(t *testing.T) {
t.Run("returns context error if cron not ready in time", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -118,7 +118,7 @@ func Test_Add(t *testing.T) {
t.Run("returns closed error if cron is closed", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -141,7 +141,7 @@ func Test_Add(t *testing.T) {
t.Run("invalid name should error", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_Add(t *testing.T) {
t.Run("empty job should error", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand Down Expand Up @@ -211,7 +211,7 @@ func Test_Get(t *testing.T) {
t.Run("returns context error if cron not ready in time", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -232,7 +232,7 @@ func Test_Get(t *testing.T) {
t.Run("returns closed error if cron is closed", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -255,7 +255,7 @@ func Test_Get(t *testing.T) {
t.Run("invalid name should error", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand Down Expand Up @@ -293,7 +293,7 @@ func Test_Delete(t *testing.T) {
t.Run("returns context error if cron not ready in time", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -312,7 +312,7 @@ func Test_Delete(t *testing.T) {
t.Run("returns closed error if cron is closed", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand All @@ -333,7 +333,7 @@ func Test_Delete(t *testing.T) {
t.Run("invalid name should error", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
cron, err := New(Options{
Log: logr.Discard(),
Client: client,
Expand Down
15 changes: 9 additions & 6 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/utils/clock"

"github.com/diagridio/go-etcd-cron/api"
"github.com/diagridio/go-etcd-cron/internal/client"
"github.com/diagridio/go-etcd-cron/internal/counter"
"github.com/diagridio/go-etcd-cron/internal/garbage"
"github.com/diagridio/go-etcd-cron/internal/grave"
Expand Down Expand Up @@ -84,9 +85,9 @@ type Options struct {
// cron is the implementation of the cron interface.
type cron struct {
log logr.Logger
client clientv3.KV
triggerFn TriggerFunction

client client.Interface
part partitioner.Interface
key *key.Key
informer *informer.Informer
Expand Down Expand Up @@ -131,35 +132,37 @@ func New(opts Options) (Interface, error) {
log = log.WithName("diagrid-cron")
}

yard := grave.New()
client := client.New(opts.Client)

collector := garbage.New(garbage.Options{
Log: log,
Client: opts.Client,
Client: client,
})

key := key.New(key.Options{
Namespace: opts.Namespace,
PartitionID: opts.PartitionID,
})

yard := grave.New()
informer := informer.New(informer.Options{
Key: key,
Client: opts.Client,
Client: client,
Collector: collector,
Partitioner: part,
Yard: yard,
})

leadership := leadership.New(leadership.Options{
Log: log,
Client: opts.Client,
Client: client,
PartitionTotal: opts.PartitionTotal,
Key: key,
})

return &cron{
log: log,
client: opts.Client,
client: client,
triggerFn: opts.TriggerFn,
key: key,
leadership: leadership,
Expand Down
22 changes: 11 additions & 11 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func Test_retry(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggerd atomic.Int64
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -71,7 +71,7 @@ func Test_retry(t *testing.T) {
func Test_payload(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
gotCh := make(chan *api.TriggerRequest, 1)
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -129,7 +129,7 @@ func Test_payload(t *testing.T) {
func Test_remove(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -173,7 +173,7 @@ func Test_remove(t *testing.T) {
func Test_upsert(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -225,7 +225,7 @@ func Test_upsert(t *testing.T) {
func Test_patition(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64

crons := make([]Interface, 100)
Expand Down Expand Up @@ -283,7 +283,7 @@ func Test_patition(t *testing.T) {
func Test_oneshot(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -331,7 +331,7 @@ func Test_oneshot(t *testing.T) {
func Test_repeat(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64
cron, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -383,7 +383,7 @@ func Test_Run(t *testing.T) {
t.Run("Running multiple times should error", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)
var triggered atomic.Int64
cronI, err := New(Options{
Log: logr.Discard(),
Expand Down Expand Up @@ -440,7 +440,7 @@ func Test_schedule(t *testing.T) {
t.Run("if no counter, job should not be deleted", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)

now := time.Now().UTC().Add(time.Hour)
job := &api.JobStored{
Expand Down Expand Up @@ -509,7 +509,7 @@ func Test_schedule(t *testing.T) {
t.Run("if schedule is not done, job and counter should not be deleted", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)

now := time.Now().UTC().Add(time.Hour)
job := &api.JobStored{
Expand Down Expand Up @@ -588,7 +588,7 @@ func Test_schedule(t *testing.T) {
t.Run("if schedule is done, expect job and counter to be deleted", func(t *testing.T) {
t.Parallel()

client := tests.EmbeddedETCD(t)
client := tests.EmbeddedETCDBareClient(t)

now := time.Now().UTC()
job := &api.JobStored{
Expand Down
Loading

0 comments on commit fd1fdaa

Please sign in to comment.