Skip to content

Commit

Permalink
add proper teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed Feb 20, 2024
1 parent 3c25598 commit 132f530
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 132 deletions.
182 changes: 100 additions & 82 deletions cmd/hatchet-engine/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ import (
"github.com/hatchet-dev/hatchet/internal/telemetry"
)

func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
type Teardown struct {
name string
fn func() error
}

func StartEngine(cf *loader.ConfigLoader, ctx context.Context) error {

Check warning on line 28 in cmd/hatchet-engine/engine/run.go

View workflow job for this annotation

GitHub Actions / lint

context-as-argument: context.Context should be the first parameter of a function (revive)
sc, err := cf.LoadServerConfig()

if err != nil {
panic(err)
return err
}

errCh := make(chan error)
Expand All @@ -36,14 +41,19 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
})

if err != nil {
panic(fmt.Sprintf("could not initialize tracer: %s", err))
return fmt.Errorf("could not initialize tracer: %w", err)
}

defer shutdown(ctx) // nolint: errcheck
var teardown []Teardown

if sc.HasService("grpc") {
wg.Add(2)
teardown = append(teardown, Teardown{
name: "telemetry",
fn: func() error {
return shutdown(ctx)
},
})

if sc.HasService("grpc") {
// create the dispatcher
d, err := dispatcher.New(
dispatcher.WithTaskQueue(sc.TaskQueue),
Expand All @@ -52,19 +62,19 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
)

if err != nil {
errCh <- err
return
return fmt.Errorf("could not create dispatcher: %w", err)
}

go func() {
defer wg.Done()
err := d.Start(ctx)

cleanup, err := d.Start()
if err != nil {
panic(err)
}

log.Printf("dispatcher has shutdown") // ✅
teardown = append(teardown, Teardown{
name: "grpc dispatcher",
fn: cleanup,
})
}()

// create the event ingestor
Expand All @@ -76,8 +86,7 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
)

if err != nil {
errCh <- err
return
return fmt.Errorf("could not create ingestor: %w", err)
}

adminSvc, err := admin.NewAdminService(
Expand All @@ -86,8 +95,7 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
)

if err != nil {
errCh <- err
return
return fmt.Errorf("could not create admin service: %w", err)
}

grpcOpts := []grpc.ServerOpt{
Expand All @@ -111,40 +119,40 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
)

if err != nil {
errCh <- err
return
return fmt.Errorf("could not create grpc server: %w", err)
}

go func() {
defer wg.Done()
err = s.Start(ctx)
cleanup, err := s.Start()

if err != nil {
errCh <- err
return
panic(err)
}

log.Printf("grpc server has shutdown")
teardown = append(teardown, Teardown{
name: "grpc server",
fn: cleanup,
})
}()
}

if sc.HasService("eventscontroller") {
wg.Add(1)

ec, err := events.New(
events.WithTaskQueue(sc.TaskQueue),
events.WithRepository(sc.Repository),
events.WithLogger(sc.Logger),
)

if err != nil {
return fmt.Errorf("could not create events controller: %w", err)
}

// create separate events controller process
go func() {
defer wg.Done()

ec, err := events.New(
events.WithTaskQueue(sc.TaskQueue),
events.WithRepository(sc.Repository),
events.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = ec.Start(ctx)

if err != nil {
Expand All @@ -158,21 +166,20 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
if sc.HasService("jobscontroller") {
wg.Add(1)

jc, err := jobs.New(
jobs.WithTaskQueue(sc.TaskQueue),
jobs.WithRepository(sc.Repository),
jobs.WithLogger(sc.Logger),
)

if err != nil {
return fmt.Errorf("could not create jobs controller: %w", err)
}

// create separate jobs controller process
go func() {
defer wg.Done()

jc, err := jobs.New(
jobs.WithTaskQueue(sc.TaskQueue),
jobs.WithRepository(sc.Repository),
jobs.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = jc.Start(ctx)

if err != nil {
Expand All @@ -186,22 +193,21 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
if sc.HasService("workflowscontroller") {
wg.Add(1)

wc, err := workflows.New(
workflows.WithTaskQueue(sc.TaskQueue),
workflows.WithRepository(sc.Repository),
workflows.WithLogger(sc.Logger),
)

if err != nil {
return fmt.Errorf("could not create workflows controller: %w", err)
}

// create separate jobs controller process
go func() {
defer wg.Done()

jc, err := workflows.New(
workflows.WithTaskQueue(sc.TaskQueue),
workflows.WithRepository(sc.Repository),
workflows.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = jc.Start(ctx)
err = wc.Start(ctx)

if err != nil {
errCh <- err
Expand All @@ -214,21 +220,20 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
if sc.HasService("ticker") {
wg.Add(1)

t, err := ticker.New(
ticker.WithTaskQueue(sc.TaskQueue),
ticker.WithRepository(sc.Repository),
ticker.WithLogger(sc.Logger),
)

if err != nil {
return fmt.Errorf("could not create ticker: %w", err)
}

// create a ticker
go func() {
defer wg.Done()

t, err := ticker.New(
ticker.WithTaskQueue(sc.TaskQueue),
ticker.WithRepository(sc.Repository),
ticker.WithLogger(sc.Logger),
)

if err != nil {
errCh <- err
return
}

err = t.Start(ctx)

if err != nil {
Expand All @@ -242,27 +247,26 @@ func StartEngineOrDie(cf *loader.ConfigLoader, ctx context.Context) {
if sc.HasService("heartbeater") {
wg.Add(1)

go func() {
defer wg.Done()
h, err := heartbeat.New(
heartbeat.WithTaskQueue(sc.TaskQueue),
heartbeat.WithRepository(sc.Repository),
heartbeat.WithLogger(sc.Logger),
)

h, err := heartbeat.New(
heartbeat.WithTaskQueue(sc.TaskQueue),
heartbeat.WithRepository(sc.Repository),
heartbeat.WithLogger(sc.Logger),
)
if err != nil {
return fmt.Errorf("could not create heartbeater: %w", err)
}

if err != nil {
errCh <- err
return
}
go func() {
defer wg.Done()

err = h.Start(ctx)

if err != nil {
errCh <- err
}

log.Printf("heartbeater has shutdown")
log.Printf("heartbeater has shutdown") // ✅
}()
}

Expand All @@ -286,11 +290,25 @@ Loop:
wg.Wait()
log.Printf("all services have shutdown")

err = sc.Disconnect()
log.Printf("waiting for all other services to gracefully exit...")
i := 0
for _, t := range teardown {
log.Printf("shutting down %s (%d/%d)", t.name, i+1, len(teardown))
err := t.fn()

if err != nil {
return fmt.Errorf("could not teardown %s: %w", t.name, err)
}
log.Printf("successfully shutdown %s (%d/%d)", t.name, i+1, len(teardown))
}
log.Printf("all services have successfully gracefully exited")

err = sc.Disconnect()
if err != nil {
panic(err)
return fmt.Errorf("could not disconnect from repository: %w", err)
}

log.Printf("successfully shutdown")

return nil
}
2 changes: 1 addition & 1 deletion cmd/hatchet-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var rootCmd = &cobra.Command{
context, cancel := cmdutils.NewInterruptContext()
defer cancel()

engine.StartEngineOrDie(cf, context)
engine.StartEngine(cf, context)

Check failure on line 31 in cmd/hatchet-engine/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `engine.StartEngine` is not checked (errcheck)
},
}

Expand Down
18 changes: 16 additions & 2 deletions examples/loadtest/cli/cli_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"context"
"log"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,7 +47,20 @@ func TestLoadCLI(t *testing.T) {
}}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
testutils.Setup(t, ctx)

wg := sync.WaitGroup{}

go func() {
defer wg.Done()
defer log.Printf("setup end")

wg.Add(1)
log.Printf("setup start")

testutils.SetupEngine(t, ctx)
}()

time.Sleep(10 * time.Second)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -72,6 +86,6 @@ func TestLoadCLI(t *testing.T) {
cancel()

log.Printf("test complete")
time.Sleep(30 * time.Second)
wg.Wait()
log.Printf("cleanup complete")
}
Loading

0 comments on commit 132f530

Please sign in to comment.