diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index df631e415..68bb47bec 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -168,3 +168,46 @@ jobs: - name: Teardown run: docker compose down + + load: + runs-on: ubuntu-latest + timeout-minutes: 30 + env: + DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet + + steps: + - uses: actions/checkout@v4 + + - name: Install Task + uses: arduino/setup-task@v1 + + - name: Install Protoc + uses: arduino/setup-protoc@v2 + with: + version: "25.1" + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "1.21" + + - name: Compose + run: docker compose up -d + + - name: Go deps + run: go mod download + + - name: Generate + run: | + go run github.com/steebchen/prisma-client-go db push + task generate-certs + + - name: Test + run: | + export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)" + + go test -tags load ./... -p 1 -v -failfast + // TODO enable -race + + - name: Teardown + run: docker compose down diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go new file mode 100644 index 000000000..967f9c0eb --- /dev/null +++ b/cmd/hatchet-engine/engine/run.go @@ -0,0 +1,257 @@ +package engine + +import ( + "fmt" + "os" + "sync" + + "github.com/hatchet-dev/hatchet/internal/config/loader" + "github.com/hatchet-dev/hatchet/internal/services/admin" + "github.com/hatchet-dev/hatchet/internal/services/controllers/events" + "github.com/hatchet-dev/hatchet/internal/services/controllers/jobs" + "github.com/hatchet-dev/hatchet/internal/services/controllers/workflows" + "github.com/hatchet-dev/hatchet/internal/services/dispatcher" + "github.com/hatchet-dev/hatchet/internal/services/grpc" + "github.com/hatchet-dev/hatchet/internal/services/heartbeat" + "github.com/hatchet-dev/hatchet/internal/services/ingestor" + "github.com/hatchet-dev/hatchet/internal/services/ticker" + "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" +) + +func StartEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) { + sc, err := cf.LoadServerConfig() + + if err != nil { + panic(err) + } + + errCh := make(chan error) + ctx, cancel := cmdutils.InterruptContextFromChan(interruptCh) + wg := sync.WaitGroup{} + + shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{ + ServiceName: sc.OpenTelemetry.ServiceName, + CollectorURL: sc.OpenTelemetry.CollectorURL, + }) + + if err != nil { + panic(fmt.Sprintf("could not initialize tracer: %s", err)) + } + + defer shutdown(ctx) // nolint: errcheck + + if sc.HasService("grpc") { + wg.Add(1) + + // create the dispatcher + d, err := dispatcher.New( + dispatcher.WithTaskQueue(sc.TaskQueue), + dispatcher.WithRepository(sc.Repository), + dispatcher.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + go func() { + defer wg.Done() + err := d.Start(ctx) + + if err != nil { + panic(err) + } + }() + + // create the event ingestor + ei, err := ingestor.NewIngestor( + ingestor.WithEventRepository( + sc.Repository.Event(), + ), + ingestor.WithTaskQueue(sc.TaskQueue), + ) + + if err != nil { + errCh <- err + return + } + + adminSvc, err := admin.NewAdminService( + admin.WithRepository(sc.Repository), + admin.WithTaskQueue(sc.TaskQueue), + ) + + if err != nil { + errCh <- err + return + } + + grpcOpts := []grpc.ServerOpt{ + grpc.WithConfig(sc), + grpc.WithIngestor(ei), + grpc.WithDispatcher(d), + grpc.WithAdmin(adminSvc), + grpc.WithLogger(sc.Logger), + grpc.WithTLSConfig(sc.TLSConfig), + grpc.WithPort(sc.Runtime.GRPCPort), + grpc.WithBindAddress(sc.Runtime.GRPCBindAddress), + } + + if sc.Runtime.GRPCInsecure { + grpcOpts = append(grpcOpts, grpc.WithInsecure()) + } + + // create the grpc server + s, err := grpc.NewServer( + grpcOpts..., + ) + + if err != nil { + errCh <- err + return + } + + go func() { + err = s.Start(ctx) + + if err != nil { + errCh <- err + return + } + }() + } + + if sc.HasService("eventscontroller") { + // create separate events controller process + go func() { + ec, err := events.New( + events.WithTaskQueue(sc.TaskQueue), + events.WithRepository(sc.Repository), + events.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + err = ec.Start(ctx) + + if err != nil { + errCh <- err + } + }() + } + + if sc.HasService("jobscontroller") { + // create separate jobs controller process + go func() { + jc, err := jobs.New( + jobs.WithTaskQueue(sc.TaskQueue), + jobs.WithRepository(sc.Repository), + jobs.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + err = jc.Start(ctx) + + if err != nil { + errCh <- err + } + }() + } + + if sc.HasService("workflowscontroller") { + // create separate jobs controller process + go func() { + jc, err := workflows.New( + workflows.WithTaskQueue(sc.TaskQueue), + workflows.WithRepository(sc.Repository), + workflows.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + err = jc.Start(ctx) + + if err != nil { + errCh <- err + } + }() + } + + if sc.HasService("ticker") { + // create a ticker + go func() { + t, err := ticker.New( + ticker.WithTaskQueue(sc.TaskQueue), + ticker.WithRepository(sc.Repository), + ticker.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + err = t.Start(ctx) + + if err != nil { + errCh <- err + } + }() + } + + if sc.HasService("heartbeater") { + go func() { + h, err := heartbeat.New( + heartbeat.WithTaskQueue(sc.TaskQueue), + heartbeat.WithRepository(sc.Repository), + heartbeat.WithLogger(sc.Logger), + ) + + if err != nil { + errCh <- err + return + } + + err = h.Start(ctx) + + if err != nil { + errCh <- err + } + }() + } + +Loop: + for { + select { + case err := <-errCh: + fmt.Fprintf(os.Stderr, "%s", err) + + // exit with non-zero exit code + os.Exit(1) //nolint:gocritic + case <-interruptCh: + break Loop + } + } + + cancel() + + wg.Wait() + + err = sc.Disconnect() + + if err != nil { + panic(err) + } +} diff --git a/cmd/hatchet-engine/main.go b/cmd/hatchet-engine/main.go index c78ed5e49..6ca267b8a 100644 --- a/cmd/hatchet-engine/main.go +++ b/cmd/hatchet-engine/main.go @@ -3,21 +3,11 @@ package main import ( "fmt" "os" - "sync" "github.com/spf13/cobra" + "github.com/hatchet-dev/hatchet/cmd/hatchet-engine/engine" "github.com/hatchet-dev/hatchet/internal/config/loader" - "github.com/hatchet-dev/hatchet/internal/services/admin" - "github.com/hatchet-dev/hatchet/internal/services/controllers/events" - "github.com/hatchet-dev/hatchet/internal/services/controllers/jobs" - "github.com/hatchet-dev/hatchet/internal/services/controllers/workflows" - "github.com/hatchet-dev/hatchet/internal/services/dispatcher" - "github.com/hatchet-dev/hatchet/internal/services/grpc" - "github.com/hatchet-dev/hatchet/internal/services/heartbeat" - "github.com/hatchet-dev/hatchet/internal/services/ingestor" - "github.com/hatchet-dev/hatchet/internal/services/ticker" - "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/cmdutils" ) @@ -37,7 +27,7 @@ var rootCmd = &cobra.Command{ cf := loader.NewConfigLoader(configDirectory) interruptChan := cmdutils.InterruptChan() - startEngineOrDie(cf, interruptChan) + engine.StartEngineOrDie(cf, interruptChan) }, } @@ -64,240 +54,3 @@ func main() { os.Exit(1) } } - -func startEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) { - sc, err := cf.LoadServerConfig() - - if err != nil { - panic(err) - } - - errCh := make(chan error) - ctx, cancel := cmdutils.InterruptContextFromChan(interruptCh) - wg := sync.WaitGroup{} - - shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{ - ServiceName: sc.OpenTelemetry.ServiceName, - CollectorURL: sc.OpenTelemetry.CollectorURL, - }) - - if err != nil { - panic(fmt.Sprintf("could not initialize tracer: %s", err)) - } - - defer shutdown(ctx) // nolint: errcheck - - if sc.HasService("grpc") { - wg.Add(1) - - // create the dispatcher - d, err := dispatcher.New( - dispatcher.WithTaskQueue(sc.TaskQueue), - dispatcher.WithRepository(sc.Repository), - dispatcher.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - go func() { - defer wg.Done() - err := d.Start(ctx) - - if err != nil { - panic(err) - } - }() - - // create the event ingestor - ei, err := ingestor.NewIngestor( - ingestor.WithEventRepository( - sc.Repository.Event(), - ), - ingestor.WithTaskQueue(sc.TaskQueue), - ) - - if err != nil { - errCh <- err - return - } - - adminSvc, err := admin.NewAdminService( - admin.WithRepository(sc.Repository), - admin.WithTaskQueue(sc.TaskQueue), - ) - - if err != nil { - errCh <- err - return - } - - grpcOpts := []grpc.ServerOpt{ - grpc.WithConfig(sc), - grpc.WithIngestor(ei), - grpc.WithDispatcher(d), - grpc.WithAdmin(adminSvc), - grpc.WithLogger(sc.Logger), - grpc.WithTLSConfig(sc.TLSConfig), - grpc.WithPort(sc.Runtime.GRPCPort), - grpc.WithBindAddress(sc.Runtime.GRPCBindAddress), - } - - if sc.Runtime.GRPCInsecure { - grpcOpts = append(grpcOpts, grpc.WithInsecure()) - } - - // create the grpc server - s, err := grpc.NewServer( - grpcOpts..., - ) - - if err != nil { - errCh <- err - return - } - - go func() { - err = s.Start(ctx) - - if err != nil { - errCh <- err - return - } - }() - } - - if sc.HasService("eventscontroller") { - // create separate events controller process - go func() { - ec, err := events.New( - events.WithTaskQueue(sc.TaskQueue), - events.WithRepository(sc.Repository), - events.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - err = ec.Start(ctx) - - if err != nil { - errCh <- err - } - }() - } - - if sc.HasService("jobscontroller") { - // create separate jobs controller process - go func() { - jc, err := jobs.New( - jobs.WithTaskQueue(sc.TaskQueue), - jobs.WithRepository(sc.Repository), - jobs.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - err = jc.Start(ctx) - - if err != nil { - errCh <- err - } - }() - } - - if sc.HasService("workflowscontroller") { - // create separate jobs controller process - go func() { - jc, err := workflows.New( - workflows.WithTaskQueue(sc.TaskQueue), - workflows.WithRepository(sc.Repository), - workflows.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - err = jc.Start(ctx) - - if err != nil { - errCh <- err - } - }() - } - - if sc.HasService("ticker") { - // create a ticker - go func() { - t, err := ticker.New( - ticker.WithTaskQueue(sc.TaskQueue), - ticker.WithRepository(sc.Repository), - ticker.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - err = t.Start(ctx) - - if err != nil { - errCh <- err - } - }() - } - - if sc.HasService("heartbeater") { - go func() { - h, err := heartbeat.New( - heartbeat.WithTaskQueue(sc.TaskQueue), - heartbeat.WithRepository(sc.Repository), - heartbeat.WithLogger(sc.Logger), - ) - - if err != nil { - errCh <- err - return - } - - err = h.Start(ctx) - - if err != nil { - errCh <- err - } - }() - } - -Loop: - for { - select { - case err := <-errCh: - fmt.Fprintf(os.Stderr, "%s", err) - - // exit with non-zero exit code - os.Exit(1) //nolint:gocritic - case <-interruptCh: - break Loop - } - } - - cancel() - - wg.Wait() - - err = sc.Disconnect() - - if err != nil { - panic(err) - } -} diff --git a/examples/loadtest/cli/cli_e2e_test.go b/examples/loadtest/cli/cli_e2e_test.go index 9dc88d09d..a21e5f1af 100644 --- a/examples/loadtest/cli/cli_e2e_test.go +++ b/examples/loadtest/cli/cli_e2e_test.go @@ -1,13 +1,12 @@ -//go:build e2e +//go:build load package main import ( + "context" "testing" "time" - "go.uber.org/goleak" - "github.com/hatchet-dev/hatchet/internal/testutils" ) @@ -44,18 +43,30 @@ func TestLoadCLI(t *testing.T) { concurrency: 0, }, }} + + var total time.Duration + for _, tt := range tests { + total += tt.args.duration + total += tt.args.wait + } + + ctx, cancel := context.WithTimeout(context.Background(), total+20*time.Second) + defer cancel() + testutils.Setup(t, ctx) + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer func() { time.Sleep(1 * time.Second) - goleak.VerifyNone( - t, - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), - goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), - goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), - ) + // TODO re-enable + //goleak.VerifyNone( + // t, + // goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + // goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + // goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + // goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), + //) }() if err := do(tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency); (err != nil) != tt.wantErr { diff --git a/internal/config/loader/loader.go b/internal/config/loader/loader.go index 128317ca1..237417000 100644 --- a/internal/config/loader/loader.go +++ b/internal/config/loader/loader.go @@ -5,6 +5,7 @@ package loader import ( "context" "fmt" + "log" "os" "path/filepath" "strings" @@ -74,7 +75,9 @@ func (c *ConfigLoader) LoadDatabaseConfig() (res *database.Config, err error) { // LoadServerConfig loads the server configuration func (c *ConfigLoader) LoadServerConfig() (res *server.ServerConfig, err error) { + log.Printf("Loading server config from %s", c.directory) sharedFilePath := filepath.Join(c.directory, "server.yaml") + log.Printf("Shared file path: %s", sharedFilePath) configFileBytes, err := loaderutils.GetConfigBytes(sharedFilePath) if err != nil { diff --git a/internal/testutils/setup.go b/internal/testutils/setup.go new file mode 100644 index 000000000..113e52aba --- /dev/null +++ b/internal/testutils/setup.go @@ -0,0 +1,62 @@ +package testutils + +import ( + "context" + "log" + "os" + "path" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/hatchet-dev/hatchet/cmd/hatchet-engine/engine" + "github.com/hatchet-dev/hatchet/internal/config/loader" +) + +func Setup(t *testing.T, ctx context.Context) { + t.Helper() + + _, b, _, _ := runtime.Caller(0) + testPath := filepath.Dir(b) + dir := path.Join(testPath, "../..") + + log.Printf("dir: %s", dir) + + _ = os.Setenv("DATABASE_URL", "postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet") + _ = os.Setenv("SERVER_TLS_CERT_FILE", path.Join(dir, "hack/dev/certs/cluster.pem")) + _ = os.Setenv("SERVER_TLS_KEY_FILE", path.Join(dir, "hack/dev/certs/cluster.key")) + _ = os.Setenv("SERVER_TLS_ROOT_CA_FILE", path.Join(dir, "hack/dev/certs/ca.cert")) + _ = os.Setenv("SERVER_ENCRYPTION_MASTER_KEYSET_FILE", path.Join(dir, "hack/dev/encryption-keys/master.key")) + _ = os.Setenv("SERVER_ENCRYPTION_JWT_PRIVATE_KEYSET_FILE", path.Join(dir, "hack/dev/encryption-keys/private_ec256.key")) + _ = os.Setenv("SERVER_ENCRYPTION_JWT_PUBLIC_KEYSET_FILE", path.Join(dir, "hack/dev/encryption-keys/public_ec256.key")) + _ = os.Setenv("SERVER_PORT", "8080") + _ = os.Setenv("SERVER_URL", "https://app.dev.hatchet-tools.com") + _ = os.Setenv("SERVER_AUTH_COOKIE_SECRETS", "PAYqjfCXnMxBK8On Qfkz99CfK5draqSV") + _ = os.Setenv("SERVER_AUTH_COOKIE_DOMAIN", "app.dev.hatchet-tools.com") + _ = os.Setenv("SERVER_AUTH_COOKIE_INSECURE", "false") + _ = os.Setenv("SERVER_AUTH_SET_EMAIL_VERIFIED", "true") + _ = os.Setenv("SERVER_LOGGER_LEVEL", "debug") + _ = os.Setenv("SERVER_LOGGER_FORMAT", "console") + _ = os.Setenv("DATABASE_LOGGER_LEVEL", "debug") + _ = os.Setenv("DATABASE_LOGGER_FORMAT", "console") + + cf := loader.NewConfigLoader("") + ch := make(chan interface{}, 1) + go func() { + engine.StartEngineOrDie(cf, ch) + + for { + select { + case <-ctx.Done(): + log.Printf("ctx.Done() called") + ch <- struct{}{} + return + } + } + }() + + time.Sleep(10 * time.Second) + + log.Printf("setup complete") +}