Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(os/gcron): add graceful shutdown support #3625

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions os/gcron/gcron_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gcron

import (
"context"
"sync"
"time"

"github.com/gogf/gf/v2/container/garray"
Expand All @@ -19,10 +20,11 @@ import (

// Cron stores all the cron job entries.
type Cron struct {
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
jobWaiter sync.WaitGroup // graceful shutdown when cron jobs are stopped.
oldme-git marked this conversation as resolved.
Show resolved Hide resolved
}

// New returns a new Cron object with default settings.
Expand Down Expand Up @@ -184,6 +186,7 @@ func (c *Cron) Stop(name ...string) {
}
} else {
c.status.Set(StatusStopped)
c.jobWaiter.Wait()
gqcn marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
3 changes: 3 additions & 0 deletions os/gcron/gcron_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (e *Entry) checkAndRun(ctx context.Context) {
e.Close()

case StatusReady, StatusRunning:
e.cron.jobWaiter.Add(1)
defer e.cron.jobWaiter.Done()
oldme-git marked this conversation as resolved.
Show resolved Hide resolved
oldme-git marked this conversation as resolved.
Show resolved Hide resolved

defer func() {
if exception := recover(); exception != nil {
// Exception caught, it logs the error content to logger in default behavior.
Expand Down
25 changes: 25 additions & 0 deletions os/gcron/gcron_z_example_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ package gcron_test

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
)
Expand All @@ -21,3 +25,24 @@ func ExampleCronAddSingleton() {
})
select {}
}

func ExampleCronGracefulShutdown() {
_, err := gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(5 * time.Second)
g.Log().Debug(ctx, "Every 2s job after 5 second end")
}, "MyCronJob")
if err != nil {
panic(err)
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

sig := <-quit
glog.Printf(ctx, "signal: %s,stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.Stop()
glog.Print(ctx, "all cron jobs completed")
oldme-git marked this conversation as resolved.
Show resolved Hide resolved
}
42 changes: 42 additions & 0 deletions os/gcron/gcron_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ package gcron_test
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/test/gtest"
)

Expand Down Expand Up @@ -277,3 +281,41 @@ func TestCron_DelayAddTimes(t *testing.T) {
t.Assert(cron.Size(), 0)
})
}

func TestCron_JobWaiter(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var err error
s1 := garray.New(true)
s2 := garray.New(true)
_, err = gcron.Add(ctx, "* * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every second")
s1.Append(struct{}{})
}, "MyFirstCronJob")
t.Assert(err, nil)
_, err = gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(3 * time.Second)
s2.Append(struct{}{})
g.Log().Debug(ctx, "Every 2s job after 3 second end")
}, "MySecondCronJob")
t.Assert(err, nil)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

go func() {
time.Sleep(4 * time.Second) // 确保任务启动
glog.Print(ctx, "Sending SIGINT")
quit <- syscall.SIGINT // 模拟 SIGINT 信号
gqcn marked this conversation as resolved.
Show resolved Hide resolved
}()

sig := <-quit
glog.Printf(ctx, "signal: %s,stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.Stop()
glog.Print(ctx, "all cron jobs completed")
oldme-git marked this conversation as resolved.
Show resolved Hide resolved
t.Assert(s1.Len(), 4)
t.Assert(s2.Len(), 2)
})
}