Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(os/gcron): add graceful shutdown support #3625

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions os/gcron/gcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,8 @@ func Start(name ...string) {
func Stop(name ...string) {
defaultCron.Stop(name...)
}

// StopGracefully Blocks and waits all current running jobs done.
func StopGracefully() {
defaultCron.StopGracefully()
}
16 changes: 12 additions & 4 deletions os/gcron/gcron_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gcron

import (
"context"
"sync"
"time"

"github.com/gogf/gf/v2/container/garray"
Expand All @@ -19,10 +20,11 @@ import (

// Cron stores all the cron job entries.
type Cron struct {
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
jobWaiter sync.WaitGroup // Graceful shutdown when cron jobs are stopped.
}

// New returns a new Cron object with default settings.
Expand Down Expand Up @@ -187,6 +189,12 @@ func (c *Cron) Stop(name ...string) {
}
}

// StopGracefully Blocks and waits all current running jobs done.
func (c *Cron) StopGracefully() {
c.status.Set(StatusStopped)
c.jobWaiter.Wait()
}

// Remove deletes scheduled task which named `name`.
func (c *Cron) Remove(name string) {
if v := c.entries.Get(name); v != nil {
Expand Down
2 changes: 2 additions & 0 deletions os/gcron/gcron_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (e *Entry) checkAndRun(ctx context.Context) {
e.Close()

case StatusReady, StatusRunning:
e.cron.jobWaiter.Add(1)
defer func() {
e.cron.jobWaiter.Done()
if exception := recover(); exception != nil {
// Exception caught, it logs the error content to logger in default behavior.
e.logErrorf(ctx,
Expand Down
25 changes: 25 additions & 0 deletions os/gcron/gcron_z_example_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ package gcron_test

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
)
Expand All @@ -21,3 +25,24 @@ func ExampleCronAddSingleton() {
})
select {}
}

func ExampleCronGracefulShutdown() {
_, err := gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(5 * time.Second)
g.Log().Debug(ctx, "Every 2s job after 5 second end")
}, "MyCronJob")
if err != nil {
panic(err)
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

sig := <-quit
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.StopGracefully()
glog.Print(ctx, "All cron jobs completed")
}
42 changes: 42 additions & 0 deletions os/gcron/gcron_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ package gcron_test
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/test/gtest"
)

Expand Down Expand Up @@ -277,3 +281,41 @@ func TestCron_DelayAddTimes(t *testing.T) {
t.Assert(cron.Size(), 0)
})
}

func TestCron_JobWaiter(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var err error
s1 := garray.New(true)
s2 := garray.New(true)
_, err = gcron.Add(ctx, "* * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every second")
s1.Append(struct{}{})
}, "MyFirstCronJob")
t.Assert(err, nil)
_, err = gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(3 * time.Second)
s2.Append(struct{}{})
g.Log().Debug(ctx, "Every 2s job after 3 second end")
}, "MySecondCronJob")
t.Assert(err, nil)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

go func() {
time.Sleep(4 * time.Second) // Ensure that the job is triggered twice
glog.Print(ctx, "Sending SIGINT")
quit <- syscall.SIGINT // Send SIGINT
}()

sig := <-quit
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.StopGracefully()
glog.Print(ctx, "All cron jobs completed")
t.Assert(s1.Len(), 4)
t.Assert(s2.Len(), 2)
})
}