-
Notifications
You must be signed in to change notification settings - Fork 92
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add example demonstrating graceful shutdown (#2)
As I was writing the docs for graceful shutdown, I realized that it wasn't a half bad idea to include a full code sample for a realistic program that when exiting, would (1) try a soft stop to start, and (2) do a hard stop if the soft stop didn't work in time. This is somewhat non-trivial code though, and I'd be afraid to just include it it docs without testing it. I started writing the code to be actually runnable, and realized that as long as it was going to be runnable anyway, we may as well also include an example for it and commit to the repository. So that's what we do here: add an example test that shows a realistic shutdown loop demonstrating both a soft and hard stop.
- Loading branch information
Showing
1 changed file
with
149 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
package river_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"log/slog" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/jackc/pgx/v5/pgxpool" | ||
|
||
"github.com/riverqueue/river" | ||
"github.com/riverqueue/river/internal/riverinternaltest" | ||
"github.com/riverqueue/river/internal/util/slogutil" | ||
"github.com/riverqueue/river/riverdriver/riverpgxv5" | ||
) | ||
|
||
type WaitsForCancelOnlyArgs struct{} | ||
|
||
func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" } | ||
|
||
// WaitsForCancelOnlyWorker is a worker that will never finish jobs until its | ||
// context is cancelled. | ||
type WaitsForCancelOnlyWorker struct { | ||
river.WorkerDefaults[WaitsForCancelOnlyArgs] | ||
jobStarted chan struct{} | ||
} | ||
|
||
func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error { | ||
fmt.Printf("Working job that doesn't finish until cancelled\n") | ||
close(w.jobStarted) | ||
<-ctx.Done() | ||
fmt.Printf("Job cancelled\n") | ||
return nil | ||
} | ||
|
||
// Example_gracefulShutdown demonstrates a realistic-looking stop loop for | ||
// River. It listens for a SIGTERM (like might be received on a platform like | ||
// Heroku to stop a process) and when receives, tries a soft stop that waits for | ||
// work to finish. If it doesn't finish in time, a second SIGTERM will initiate | ||
// a hard stop that cancels all jobs using context cancellation. | ||
func Example_gracefulShutdown() { | ||
ctx := context.Background() | ||
|
||
dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer dbPool.Close() | ||
|
||
// Required for the purpose of this test, but not necessary in real usage. | ||
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { | ||
panic(err) | ||
} | ||
|
||
jobStarted := make(chan struct{}) | ||
|
||
workers := river.NewWorkers() | ||
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted}) | ||
|
||
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ | ||
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), | ||
Queues: map[string]river.QueueConfig{ | ||
river.DefaultQueue: {MaxWorkers: 100}, | ||
}, | ||
Workers: workers, | ||
}) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
_, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
if err := riverClient.Start(ctx); err != nil { | ||
panic(err) | ||
} | ||
|
||
riverClientStopped := make(chan struct{}) | ||
|
||
sigterm := make(chan os.Signal, 1) | ||
signal.Notify(sigterm, syscall.SIGTERM) | ||
|
||
// This is meant to be a realistic-looking stop goroutine that might go in a | ||
// real program. It waits for SIGTERM and when received, tries to stop | ||
// gracefully by allowing a chance for jobs to finish. But if that isn't | ||
// working, a second SIGTERM will tell it to terminate with prejudice and | ||
// it'll issue a hard stop that cancels the context of all active jobs. | ||
go func() { | ||
defer close(riverClientStopped) | ||
|
||
<-sigterm | ||
fmt.Printf("Received SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") | ||
|
||
softStopSucceeded := make(chan struct{}) | ||
go func() { | ||
if err := riverClient.Stop(ctx); err != nil { | ||
if !errors.Is(err, context.Canceled) { | ||
panic(err) | ||
} | ||
} | ||
close(softStopSucceeded) | ||
}() | ||
|
||
select { | ||
case <-sigterm: | ||
fmt.Printf("Received SIGTERM again; initiating hard stop (cancel everything)\n") | ||
if err := riverClient.StopAndCancel(ctx); err != nil { | ||
panic(err) | ||
} | ||
case <-softStopSucceeded: | ||
// Will never be reached in this example. | ||
} | ||
}() | ||
|
||
// Make sure our job starts being worked before doing anything else. | ||
<-jobStarted | ||
|
||
// Cheat a little by sending a SIGTERM manually for the purpose of this | ||
// example (normally this will be sent by user or supervisory process). The | ||
// first SIGTERM tries a soft stop in which jobs are givcen a chance to | ||
// finish up. | ||
sigterm <- syscall.SIGTERM | ||
|
||
// The soft stop will never work in this example because our job only | ||
// respects context cancellation, but wait a short amount of time to give it | ||
// a chance. After it elapses, send another SIGTERM to initiate a hard stop. | ||
select { | ||
case <-riverClientStopped: | ||
// Will never be reached in this example because our job will only ever | ||
// finish on context cancellation. | ||
fmt.Printf("Soft stop succeeded\n") | ||
|
||
case <-time.After(100 * time.Millisecond): | ||
sigterm <- syscall.SIGTERM | ||
<-riverClientStopped | ||
} | ||
|
||
// Output: | ||
// Working job that doesn't finish until cancelled | ||
// Received SIGTERM; initiating soft stop (try to wait for jobs to finish) | ||
// Received SIGTERM again; initiating hard stop (cancel everything) | ||
// Job cancelled | ||
} |