Skip to content

Commit

Permalink
Trigger repeating job on due_time if specified (#26)
Browse files Browse the repository at this point in the history
Updates scheduler to execute a repeating job at the due time, rather
than waiting for the next tick from this time when specified. Setting
duetime=0s and schedule=1h will execute the job immediately, then every
hour.

Updates job name validation to now also allow ":" characters.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Jun 7, 2024
1 parent c1fa1f1 commit 5021035
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 130 deletions.
11 changes: 5 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *cron) Add(ctx context.Context, name string, job *api.Job) error {
return ctx.Err()
}

if err := validateName(name); err != nil {
if err := c.validateName(name); err != nil {
return err
}

Expand Down Expand Up @@ -59,7 +59,7 @@ func (c *cron) Get(ctx context.Context, name string) (*api.Job, error) {
return nil, ctx.Err()
}

if err := validateName(name); err != nil {
if err := c.validateName(name); err != nil {
return nil, err
}

Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *cron) Delete(ctx context.Context, name string) error {
return ctx.Err()
}

if err := validateName(name); err != nil {
if err := c.validateName(name); err != nil {
return err
}

Expand All @@ -103,13 +103,12 @@ func (c *cron) Delete(ctx context.Context, name string) error {
}

// validateName validates the name of a job.
func validateName(name string) error {
func (c *cron) validateName(name string) error {
if len(name) == 0 {
return errors.New("job name cannot be empty")
}

trimmed := strings.TrimRight(strings.ReplaceAll(strings.ToLower(name), "_", "-"), "-")
for _, segment := range strings.Split(trimmed, "||") {
for _, segment := range strings.Split(strings.ToLower(c.validateNameReplacer.Replace(name)), "||") {
if errs := validation.IsDNS1123Subdomain(segment); len(errs) > 0 {
return fmt.Errorf("job name is invalid %q: %s", name, strings.Join(errs, ", "))
}
Expand Down
137 changes: 92 additions & 45 deletions api/job.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,25 @@ func Test_validateName(t *testing.T) {
name: "actorreminder||dapr-tests||dapr.internal.dapr-tests.perf-workflowsapp.workflow||24b3fbad-0db5-4e81-a272-71f6018a66a6||start-4NYDFil-",
expErr: false,
},
{
name: "aABVCD||dapr-::123:123||dapr.internal.dapr-tests.perf-workflowsapp.workflow||24b3fbad-0db5-4e81-a272-71f6018a66a6||start-4NYDFil-",
expErr: false,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := validateName(test.name)
c, err := New(Options{
Log: logr.Discard(),
Namespace: "",
PartitionID: 0,
PartitionTotal: 1,
TriggerFn: func(context.Context, *api.TriggerRequest) bool { return true },
})
require.NoError(t, err)
err = c.(*cron).validateName(test.name)
assert.Equal(t, test.expErr, err != nil, "%v", err)
})
}
Expand Down
49 changes: 26 additions & 23 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -87,15 +88,16 @@ type cron struct {
log logr.Logger
triggerFn TriggerFunction

client client.Interface
part partitioner.Interface
key *key.Key
informer *informer.Informer
yard *grave.Yard
queue *queue.Processor[string, *counter.Counter]
schedBuilder *scheduler.Builder
leadership *leadership.Leadership
collector garbage.Interface
client client.Interface
part partitioner.Interface
key *key.Key
informer *informer.Informer
yard *grave.Yard
queue *queue.Processor[string, *counter.Counter]
schedBuilder *scheduler.Builder
leadership *leadership.Leadership
collector garbage.Interface
validateNameReplacer *strings.Replacer

clock clock.Clock
running atomic.Bool
Expand Down Expand Up @@ -161,20 +163,21 @@ func New(opts Options) (Interface, error) {
})

return &cron{
log: log,
client: client,
triggerFn: opts.TriggerFn,
key: key,
leadership: leadership,
yard: yard,
informer: informer,
collector: collector,
part: part,
schedBuilder: scheduler.NewBuilder(),
clock: clock.RealClock{},
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
errCh: make(chan error),
log: log,
client: client,
triggerFn: opts.TriggerFn,
key: key,
leadership: leadership,
yard: yard,
informer: informer,
collector: collector,
part: part,
schedBuilder: scheduler.NewBuilder(),
validateNameReplacer: strings.NewReplacer("_", "", ":", "", "-", ""),
clock: clock.RealClock{},
readyCh: make(chan struct{}),
closeCh: make(chan struct{}),
errCh: make(chan error),
}, nil
}

Expand Down
Loading

0 comments on commit 5021035

Please sign in to comment.