Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor beats lockfile to use timeout, retry #34194

Merged
merged 15 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Support for multiline zookeeper logs {issue}2496[2496]
- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792]
- Disable lockfile when running under elastic-agent. {pull}33988[33988]
- Fix lockfile logic, retry locking {pull}34194[34194]
- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346]

*Auditbeat*
Expand Down
235 changes: 28 additions & 207 deletions libbeat/cmd/instance/locks/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,245 +18,66 @@
package locks

import (
"encoding/json"
"errors"
"fmt"
"os"
"runtime"
"time"

"github.com/gofrs/flock"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
metricproc "github.com/elastic/elastic-agent-system-metrics/metric/system/process"
)

// Locker is a retrying file locker
type Locker struct {
fileLock *flock.Flock
logger *logp.Logger
beatName string
filePath string
beatStart time.Time
}

type pidfile struct {
PID int `json:"pid"`
WriteTime time.Time `json:"write_time"`
fileLock *flock.Flock
retryCount int
retrySleep time.Duration
logger *logp.Logger
}

var (
// ErrAlreadyLocked is returned when a lock on the data path is attempted but
// unsuccessful because another Beat instance already has the lock on the same
// data path.
ErrAlreadyLocked = fmt.Errorf("data path already locked by another beat. Please make sure that multiple beats are not sharing the same data path (path.data)")

// ErrLockfileEmpty is returned by readExistingPidfile() when an existing pidfile is found, but the file is empty.
ErrLockfileEmpty = fmt.Errorf("lockfile is empty")
)

// a little wrapper for the gitpid function to make testing easier.
var pidFetch = os.Getpid

// New returns a new pid-aware file locker
// all logic, including checking for existing locks, is performed lazily
// New returns a new file locker
func New(beatInfo beat.Info) *Locker {
return NewWithRetry(beatInfo, 4, time.Millisecond*400)
}

// NewWithRetry returns a new file locker with the given settings
func NewWithRetry(beatInfo beat.Info, retryCount int, retrySleep time.Duration) *Locker {
lockfilePath := paths.Resolve(paths.Data, beatInfo.Beat+".lock")
return &Locker{
fileLock: flock.New(lockfilePath),
logger: logp.L(),
beatName: beatInfo.Beat,
filePath: lockfilePath,
beatStart: beatInfo.StartTime,
fileLock: flock.New(lockfilePath),
retryCount: retryCount,
retrySleep: retrySleep,
logger: logp.L(),
}
}

// Lock attempts to acquire a lock on the data path for the currently-running
// Beat instance. If another Beats instance already has a lock on the same data path
// an ErrAlreadyLocked error is returned.
func (lock *Locker) Lock() error {
new := pidfile{PID: pidFetch(), WriteTime: time.Now()}
encoded, err := json.Marshal(&new)
if err != nil {
return fmt.Errorf("error encoding json for pidfile: %w", err)
}

// The combination of O_CREATE and O_EXCL will ensure we return an error if we don't
// manage to create the file
fh, openErr := os.OpenFile(lock.filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
// Don't trust different OSes to report the errors we expect, just try to recover regardless
if openErr != nil {
err = lock.handleFailedCreate()
if err != nil {
return fmt.Errorf("cannot obtain lockfile: %w", err)
}
// If something fails here, it's probably unrecoverable
fh, err = os.OpenFile(lock.filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
for i := 0; i < lock.retryCount; i++ {
// note that TryLock doesn't set an os.O_EXCL flag,
// which means that we could be locking a file we didn't create.
// This makes it easy to recover from a failed shutdown or panic,
// as the OS will clean up the lock and we'll re-lock the same file.
// However, can create odd races if you're not careful, since you don't know if you're locking "your" file.
gotLock, err := lock.fileLock.TryLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think TryLock uses the os.O_EXCL flag. That means the file could exist already, and I think that would lead to a race condition in the Unlock function with Unlock & Remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate? I assume you mean another beat swooping in while one beat is trying to lock or remove the file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unlock code, it first unlocks, then does a remove. In between those lines of code another beat could create a new lock, but the file would be removed. This results in the new beat having an error if it goes to unlock because the lock file doesn't exist.

panic: unable to unlock data path file testing.lock: remove testing.lock: no such file or directory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that's an interesting edge case. Gonna see if I can think of a non-awkward way to protect against that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, made a change to remove the file first before we remove the lock. Going to see how the Windows CI reacts to that, but I imagine we'll want some manual testing, since I don't understand the Windows lockfile logic too well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put some or all of the detail from the PR description directly in the description of the Lock function? For example adding this would help the next developer to understand how this works.

In a case where a beat has shutdown improperly and the lockfile remains, instead of looking up a PID, we rely on the OS to release the underlying lock for the dead process, which most OSes will generally do, after a set amount of time.

It may also be worth noting that putting the PID into the lock file failed. To some degree we have had several iterations on this code because the original code did not explain itself at all, so let's try to avoid creating that problem again.

if err != nil {
return fmt.Errorf("cannot re-obtain lockfile %s: %w", lock.filePath, err)
}
}

// a Process can't write to its own locked file on all platforms, write first
_, err = fh.Write(encoded)
if err != nil {
return fmt.Errorf("error writing pidfile to %s: %w", lock.filePath, err)
}

// Exclusive lock
isLocked, err := lock.fileLock.TryLock()
if err != nil {
return fmt.Errorf("unable to lock data path: %w", err)
}
// case: lock could not be obtained.
if !isLocked {
// if we're here, things are probably unrecoverable, as we've previously checked for a lockfile. Exit.
return fmt.Errorf("%s: %w", lock.filePath, ErrAlreadyLocked)
}

return nil
}

// Unlock attempts to release the lock on a data path previously acquired via Lock().
func (lock *Locker) Unlock() error {
err := lock.fileLock.Unlock()
if err != nil {
return fmt.Errorf("unable to unlock data path: %w", err)
}

err = os.Remove(lock.fileLock.Path())
if err != nil {
return fmt.Errorf("unable to unlock data path file %s: %w", lock.fileLock.Path(), err)
}
return nil
}

// ******* private helpers

// handleFailedCreate will attempt to recover from a failed lock operation in a pid-aware way.
// The point of this is to deal with instances where an improper beat shutdown left us with
// a pre-existing pidfile for a beat process that no longer exists.
func (lock *Locker) handleFailedCreate() error {
// First, try to lock the file as a check to see what state we're in.
// If there's a pre-existing lock that's in effect, we probably can't recover
// Not all OSes will fail on this.
_, err := lock.fileLock.TryLock()
// Case: the file already locked, and in use by another process.
if err != nil {
if runtime.GOOS == "windows" {
// on windows, locks from dead PIDs will be auto-released, but it might take the OS a while.
// However, the time it takes for the operating system to unlock these locks depends upon available system resources.
time.Sleep(time.Second)
_, err := lock.fileLock.TryLock()
if err != nil {
return fmt.Errorf("The lockfile %s is locked after a retry, another beat is probably running", lock.fileLock)
}
} else {
return fmt.Errorf("The lockfile %s is already locked by another beat", lock.fileLock)
}
}

// if we're here, we've locked the file
// unlock so we can continue
err = lock.fileLock.Unlock()
if err != nil {
return fmt.Errorf("error unlocking a previously found file %s after a temporary lock", lock.filePath)
}

// read in whatever existing lockfile caused us to fail
pf, err := lock.readExistingPidfile()
// Case: two beats start up simultaneously, there's a chance we could "see" the pidfile before the other process writes to it
// or, the other beat died before it could write the pidfile.
// Sleep, read again. If we still don't have anything, assume the other PID is dead, continue.
if errors.Is(err, ErrLockfileEmpty) {
lock.logger.Debugf("Found other pidfile, but no data. Retrying.")
time.Sleep(time.Millisecond * 500)
pf, err = lock.readExistingPidfile()
if errors.Is(err, ErrLockfileEmpty) {
lock.logger.Debugf("No PID found in other lockfile, continuing")
return lock.recoverLockfile()
} else if err != nil {
return fmt.Errorf("error re-reading existing lockfile: %w", err)
return fmt.Errorf("unable to try a lock of the data path: %w", err)
}
} else if err != nil {
return fmt.Errorf("error reading existing lockfile: %w", err)
}

// Case: the lockfile is locked, but by us. Probably a coding error,
// and probably hard to do
if pf.PID == os.Getpid() {
// the lockfile was written before the beat started, meaning we restarted and somehow got the same pid
// in which case, continue
if lock.beatStart.Before(pf.WriteTime) {
return fmt.Errorf("lockfile for beat has been locked twice by the same PID, potential bug.")
if gotLock {
return nil
}
lock.logger.Debugf("Beat has started with the same PID, continuing")
return lock.recoverLockfile()
}

// Check to see if the PID found in the pidfile exists.
existsState, err := findMatchingPID(pf.PID)
// Case: we have a lockfile, but the pid from the pidfile no longer exists
// this was presumably due to the dirty shutdown.
// Try to reset the lockfile and continue.
if errors.Is(err, metricproc.ProcNotExist) {
lock.logger.Debugf("%s shut down without removing previous lockfile, continuing", lock.beatName)
return lock.recoverLockfile()
} else if err != nil {
return fmt.Errorf("error looking up status for pid %d: %w", pf.PID, err)
} else {
// Case: the PID exists, but it's attached to a zombie process
// In this case...we should be okay to restart?
if existsState == metricproc.Zombie {
lock.logger.Debugf("%s shut down without removing previous lockfile and is currently in a zombie state, continuing", lock.beatName)
return lock.recoverLockfile()
}
// Case: we've gotten a lock file for another process that's already running
// This is the "base" lockfile case, which is two beats running from the same directory
// This is where we'll catch this particular case on Linux, due to Linux's advisory-style locks.
return fmt.Errorf("cannot start, data directory belongs to process with pid %d", pf.PID)
}
}

// recoverLockfile attempts to remove the lockfile and continue running
// This should only be called after we're sure it's safe to ignore a pre-existing lockfile
// This will reset the internal lockfile handler when it's successful.
func (lock *Locker) recoverLockfile() error {
// File remove may or not work, depending on os-specific details with lockfiles
err := os.Remove(lock.fileLock.Path())
if err != nil {
if runtime.GOOS == "windows" {
// retry on windows, the OS can take time to clean up
time.Sleep(time.Second)
err = os.Remove(lock.fileLock.Path())
if err != nil {
return fmt.Errorf("tried twice to remove lockfile %s on windows: %w",
lock.fileLock.Path(), err)
}
} else {
return fmt.Errorf("lockfile %s cannot be removed: %w", lock.fileLock.Path(), err)
}

}
lock.fileLock = flock.New(lock.filePath)
return nil
}

// readExistingPidfile will read the contents of an existing pidfile
// Will return ErrLockfileEmpty if no data is found in the lockfile
func (lock *Locker) readExistingPidfile() (pidfile, error) {
rawPidfile, err := os.ReadFile(lock.filePath)
if err != nil {
return pidfile{}, fmt.Errorf("error reading pidfile from path %s", lock.filePath)
}
if len(rawPidfile) == 0 {
return pidfile{}, ErrLockfileEmpty
}
foundPidFile := pidfile{}
err = json.Unmarshal(rawPidfile, &foundPidFile)
if err != nil {
return pidfile{}, fmt.Errorf("error reading JSON from pid file %s: %w", lock.filePath, err)
lock.logger.Debugf("Could not obtain lock for file %s, retrying %d times", lock.fileLock.Path(), (lock.retryCount - i))
time.Sleep(lock.retrySleep)
}
return foundPidFile, nil
return fmt.Errorf("%s: %w", lock.fileLock.Path(), ErrAlreadyLocked)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,28 @@
// specific language governing permissions and limitations
// under the License.

//go:build (!darwin || !cgo) && !freebsd && !linux && !windows && !aix
//go:build !windows
// +build !windows

package locks

import (
"fmt"
"runtime"

"github.com/elastic/elastic-agent-system-metrics/metric/system/process"
"os"
)

func findMatchingPID(pid int) (process.PidState, error) {
return process.Dead, fmt.Errorf("findMatchingPID not supported on platform: %s", runtime.GOOS)
// Unlock attempts to release the lock on a data path previously acquired via Lock(). This will first remove the file, then unlock the file handle.
func (lock *Locker) Unlock() error {
// Unlock will remove the file while we still have the lock, so we reduce the odds of another beat swooping in to start between the Unlock() and Remove() operation.
err := os.Remove(lock.fileLock.Path())
if err != nil {
lock.logger.Warnf("could not remove lockfile at %s: %s", lock.fileLock.Path(), err)
}

err = lock.fileLock.Unlock()
if err != nil {
return fmt.Errorf("unable to unlock data path: %w", err)
}

return nil
}
Loading