From 03dc542993b65fae15c1e63dd2557a7bd8feb26f Mon Sep 17 00:00:00 2001 From: Berger Eugene Date: Sat, 4 May 2024 00:10:16 +0300 Subject: [PATCH] Feature: add ready log line process condition --- process-compose.override.yaml | 13 +++++++ src/app/process.go | 73 +++++++++++++++++++++-------------- src/app/project_runner.go | 6 +++ src/app/system_test.go | 49 +++++++++++++++++++++++ src/loader/loader.go | 1 + src/loader/validators.go | 16 ++++++++ src/types/process.go | 4 ++ www/docs/launcher.md | 24 +++++++++++- 8 files changed, 156 insertions(+), 30 deletions(-) diff --git a/process-compose.override.yaml b/process-compose.override.yaml index 47e5a88..1b8c2ba 100644 --- a/process-compose.override.yaml +++ b/process-compose.override.yaml @@ -41,6 +41,19 @@ processes: signal: 15 timeout_seconds: 4 + log-line-ready: + command: "./test_loop.bash ready-log" + ready_log_line: "test loop 1" + + dep-on-log-line-ready: + command: "echo log is ready" + availability: + restart: "on_failure" + backoff_seconds: 2 + depends_on: + log-line-ready: + condition: process_log_ready + _process2: command: "./test_loop.bash process2" log_location: ./pc.proc2.{PC_REPLICA_NUM}.log diff --git a/src/app/process.go b/src/app/process.go index 6b08d22..3f07ae2 100644 --- a/src/app/process.go +++ b/src/app/process.go @@ -34,35 +34,37 @@ const ( type Process struct { sync.Mutex - globalEnv []string - confMtx sync.Mutex - procConf *types.ProcessConfig - procState *types.ProcessState - stateMtx sync.Mutex - procCond sync.Cond - procStartedCond sync.Cond - procStateChan chan string - procReadyCtx context.Context - readyCancelFn context.CancelFunc - procRunCtx context.Context - runCancelFn context.CancelFunc - procColor func(a ...interface{}) string - noColor func(a ...interface{}) string - redColor func(a ...interface{}) string - logBuffer *pclog.ProcessLogBuffer - logger pclog.PcLogger - command Commander - started bool - done bool - timeMutex sync.Mutex - startTime time.Time - liveProber *health.Prober - readyProber *health.Prober - shellConfig command.ShellConfig - printLogs bool - isMain bool - extraArgs []string - isStopped atomic.Bool + globalEnv []string + confMtx sync.Mutex + procConf *types.ProcessConfig + procState *types.ProcessState + stateMtx sync.Mutex + procCond sync.Cond + procStartedCond sync.Cond + procStateChan chan string + procReadyCtx context.Context + readyCancelFn context.CancelFunc + procLogReadyCtx context.Context + readyLogCancelFn context.CancelCauseFunc + procRunCtx context.Context + runCancelFn context.CancelFunc + procColor func(a ...interface{}) string + noColor func(a ...interface{}) string + redColor func(a ...interface{}) string + logBuffer *pclog.ProcessLogBuffer + logger pclog.PcLogger + command Commander + started bool + done bool + timeMutex sync.Mutex + startTime time.Time + liveProber *health.Prober + readyProber *health.Prober + shellConfig command.ShellConfig + printLogs bool + isMain bool + extraArgs []string + isStopped atomic.Bool } func NewProcess( @@ -97,6 +99,7 @@ func NewProcess( } proc.procReadyCtx, proc.readyCancelFn = context.WithCancel(context.Background()) + proc.procLogReadyCtx, proc.readyLogCancelFn = context.WithCancelCause(context.Background()) proc.procRunCtx, proc.runCancelFn = context.WithCancel(context.Background()) proc.setUpProbes() proc.procCond = *sync.NewCond(proc) @@ -310,6 +313,13 @@ func (p *Process) waitUntilReady() bool { log.Error().Msgf("Process %s was aborted and won't become ready", p.getName()) p.setExitCode(1) return false + case <-p.procLogReadyCtx.Done(): + err := context.Cause(p.procLogReadyCtx) + if errors.Is(err, context.Canceled) { + return true + } + log.Error().Err(err).Msgf("Process %s was aborted and won't become ready", p.getName()) + return false } } } @@ -335,6 +345,7 @@ func (p *Process) shutDown() error { } p.setState(types.ProcessStateTerminating) p.stopProbes() + p.readyLogCancelFn(fmt.Errorf("process %s was shut down", p.getName())) if isStringDefined(p.procConf.ShutDownParams.ShutDownCommand) { return p.doConfiguredStop(p.procConf.ShutDownParams) } @@ -492,6 +503,10 @@ func (p *Process) handleOutput(pipe io.ReadCloser, handler func(message string)) Msg("error reading from stdout") break } + if p.procConf.ReadyLogLine != "" && p.procState.Health == types.ProcessHealthUnknown && strings.Contains(line, p.procConf.ReadyLogLine) { + p.procState.Health = types.ProcessHealthReady + p.readyLogCancelFn(nil) + } handler(strings.TrimSuffix(line, "\n")) } } diff --git a/src/app/project_runner.go b/src/app/project_runner.go index 5a03ecf..919bc1a 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -145,6 +145,12 @@ func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error { if !ready { return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.ReplicaName, k) } + case types.ProcessConditionLogReady: + log.Info().Msgf("%s is waiting for %s log line %s", process.ReplicaName, k, runningProc.procConf.ReadyLogLine) + ready := runningProc.waitUntilReady() + if !ready { + return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.ReplicaName, k) + } case types.ProcessConditionStarted: log.Info().Msgf("%s is waiting for %s to start", process.ReplicaName, k) runningProc.waitForStarted() diff --git a/src/app/system_test.go b/src/app/system_test.go index 9de8090..e868dc8 100644 --- a/src/app/system_test.go +++ b/src/app/system_test.go @@ -590,3 +590,52 @@ func TestSystem_TestProcShutDownNoRestart(t *testing.T) { return } } +func TestSystem_TestReadyLine(t *testing.T) { + proc1 := "proc1" + proc2 := "proc2" + shell := command.DefaultShellConfig() + project := &types.Project{ + Processes: map[string]types.ProcessConfig{ + proc1: { + Name: proc1, + ReplicaName: proc1, + Executable: shell.ShellCommand, + Args: []string{shell.ShellArgument, "sleep 0.3 && echo ready"}, + ReadyLogLine: "ready", + }, + proc2: { + Name: proc2, + ReplicaName: proc2, + Executable: shell.ShellCommand, + Args: []string{shell.ShellArgument, "sleep 2"}, + DependsOn: map[string]types.ProcessDependency{ + proc1: { + Condition: types.ProcessConditionLogReady, + }, + }, + }, + }, + ShellConfig: shell, + } + runner, err := NewProjectRunner(&ProjectOpts{ + project: project, + }) + if err != nil { + t.Errorf(err.Error()) + return + } + go runner.Run() + time.Sleep(100 * time.Millisecond) + state := runner.getRunningProcess(proc2).getStatusName() + + if state != types.ProcessStatePending { + t.Errorf("process %s is %s want %s", proc2, state, types.ProcessStatePending) + return + } + time.Sleep(400 * time.Millisecond) + state = runner.getRunningProcess(proc2).getStatusName() + if state != types.ProcessStateRunning { + t.Errorf("process %s is %s want %s", proc2, state, types.ProcessStateRunning) + return + } +} diff --git a/src/loader/loader.go b/src/loader/loader.go index 4bcda22..1c33c73 100644 --- a/src/loader/loader.go +++ b/src/loader/loader.go @@ -56,6 +56,7 @@ func Load(opts *LoaderOptions) (*types.Project, error) { validatePlatformCompatibility, validateHealthDependencyHasHealthCheck, validateDependencyIsEnabled, + validateNoIncompatibleHealthChecks, ) admitProcesses(opts, mergedProject) return mergedProject, err diff --git a/src/loader/validators.go b/src/loader/validators.go index 7b302d4..3de46d6 100644 --- a/src/loader/validators.go +++ b/src/loader/validators.go @@ -133,6 +133,22 @@ func validateHealthDependencyHasHealthCheck(p *types.Project) error { } log.Error().Msg(errStr) } + if dep.Condition == types.ProcessConditionLogReady && depProc.ReadyLogLine == "" { + errStr := fmt.Sprintf("log ready dependency defined in '%s' but no ready log line exists in '%s'", procName, depName) + log.Error().Msg(errStr) + return fmt.Errorf(errStr) + } + } + } + return nil +} + +func validateNoIncompatibleHealthChecks(p *types.Project) error { + for procName, proc := range p.Processes { + if proc.ReadinessProbe != nil && proc.ReadyLogLine != "" { + errStr := fmt.Sprintf("'ready_log_line' and readiness probe defined in '%s' are incompatible", procName) + log.Error().Msg(errStr) + return fmt.Errorf(errStr) } } return nil diff --git a/src/types/process.go b/src/types/process.go index ea59ab3..a2af2d6 100644 --- a/src/types/process.go +++ b/src/types/process.go @@ -25,6 +25,7 @@ type ProcessConfig struct { DependsOn DependsOnConfig `yaml:"depends_on,omitempty"` LivenessProbe *health.Probe `yaml:"liveness_probe,omitempty"` ReadinessProbe *health.Probe `yaml:"readiness_probe,omitempty"` + ReadyLogLine string `yaml:"ready_log_line,omitempty"` ShutDownParams ShutDownParams `yaml:"shutdown,omitempty"` DisableAnsiColors bool `yaml:"disable_ansi_colors,omitempty"` WorkingDir string `yaml:"working_dir"` @@ -161,6 +162,9 @@ const ( // ProcessConditionStarted is the type for waiting until a process has started (default). ProcessConditionStarted = "process_started" + + // ProcessConditionLogReady is the type for waiting until a process has printed a predefined log line + ProcessConditionLogReady = "process_log_ready" ) type DependsOnConfig map[string]ProcessDependency diff --git a/www/docs/launcher.md b/www/docs/launcher.md index 19ff951..af707dd 100644 --- a/www/docs/launcher.md +++ b/www/docs/launcher.md @@ -75,12 +75,34 @@ processes: condition: process_completed_successfully ``` -There are 4 condition types that can be used in process dependencies: +There are 5 condition types that can be used in process dependencies: * `process_completed` - is the type for waiting until a process has been completed (any exit code) * `process_completed_successfully` - is the type for waiting until a process has been completed successfully (exit code 0) * `process_healthy` - is the type for waiting until a process is healthy * `process_started` - is the type for waiting until a process has started (default) +* `process_log_ready` - is the type for waiting until a process has printed a predefined log line. This requires the definition of `ready_log_line` in the dependent process. + +##### Process Log Ready Example + +In some situations a process's log output is a simple way to determine if it is ready or not. For example, we can wait for a 'ready' message in the process's logs as follows: + +```yaml hl_lines="6 12" +processes: + world: + command: "echo Connected" + depends_on: + hello: + condition: process_log_ready + hello: + command: | + echo 'Preparing...' + sleep 1 + echo 'I am ready to accept connections now' + ready_log_line: "ready to accept connections" # equal to *.ready to accept connections.*\n regex +``` + +> :bulb: `ready_log_line` and readiness probe are incompatible and can't be used at the same time. ## Run only specific processes