From ae82a46f1ba3403c0fc1515e7aed4dc7d0e04109 Mon Sep 17 00:00:00 2001 From: Vladimir Plakhotnikov Date: Mon, 15 Apr 2024 12:34:24 +0500 Subject: [PATCH] Log maxExecs with jitter after worker is started Signed-off-by: Vladimir Plakhotnikov --- pool/allocator.go | 2 +- worker/worker.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pool/allocator.go b/pool/allocator.go index 5137eea..53d8e79 100644 --- a/pool/allocator.go +++ b/pool/allocator.go @@ -37,7 +37,7 @@ func NewPoolAllocator(ctx context.Context, timeout time.Duration, maxExecs uint6 } // wrap sync worker - log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) + log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.Uint64("max_execs", w.MaxExecs()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) return w, nil } } diff --git a/worker/worker.go b/worker/worker.go index 70a506c..c1b28c9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -299,7 +299,7 @@ func (w *Process) StreamIterWithContext(ctx context.Context) (*payload.Payload, // StreamCancel sends stop bit to the worker func (w *Process) StreamCancel(ctx context.Context) error { - const op = errors.Op("sync_worker_send_frame") + const op = errors.Op("sync_worker_stream_cancel") if !w.State().Compare(fsm.StateWorking) { return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) } @@ -481,6 +481,10 @@ func (w *Process) MaxExecsReached() bool { return w.maxExecs > 0 && w.State().NumExecs() >= w.maxExecs } +func (w *Process) MaxExecs() uint64 { + return w.maxExecs +} + // copyBuffer is the actual implementation of Copy and CopyBuffer. func copyBuffer(dst io.Writer, src io.Reader, buf []byte) error { for {