Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
Log maxExecs with jitter after worker is started
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Plakhotnikov <embargo2710@gmail.com>
  • Loading branch information
Kaspiman committed Apr 15, 2024
1 parent 7f2b141 commit ae82a46
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pool/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewPoolAllocator(ctx context.Context, timeout time.Duration, maxExecs uint6
}

// wrap sync worker
log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.Uint64("max_execs", w.MaxExecs()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return w, nil
}
}
Expand Down
6 changes: 5 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (w *Process) StreamIterWithContext(ctx context.Context) (*payload.Payload,

// StreamCancel sends stop bit to the worker
func (w *Process) StreamCancel(ctx context.Context) error {
const op = errors.Op("sync_worker_send_frame")
const op = errors.Op("sync_worker_stream_cancel")

Check warning on line 302 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L302

Added line #L302 was not covered by tests
if !w.State().Compare(fsm.StateWorking) {
return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String())
}
Expand Down Expand Up @@ -481,6 +481,10 @@ func (w *Process) MaxExecsReached() bool {
return w.maxExecs > 0 && w.State().NumExecs() >= w.maxExecs
}

func (w *Process) MaxExecs() uint64 {
return w.maxExecs
}

// copyBuffer is the actual implementation of Copy and CopyBuffer.
func copyBuffer(dst io.Writer, src io.Reader, buf []byte) error {
for {
Expand Down

0 comments on commit ae82a46

Please sign in to comment.