Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0.0] Extract methods to acquire and release a filesystem lock. #24223

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions src/main/cpp/blaze.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
#include <grpcpp/security/credentials.h>
#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
#include <chrono> // NOLINT (gRPC requires this)
#include <cinttypes>
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <optional>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -195,8 +195,8 @@ class BlazeServer final {
public:
explicit BlazeServer(const StartupOptions &startup_options);

// Acquire a lock for the server running in this output base. Returns the
// number of milliseconds spent waiting for the lock.
// Acquire a lock for the output base this server is running in.
// Returns the number of milliseconds spent waiting for the lock.
uint64_t AcquireLock();

// Whether there is an active connection to a server.
Expand Down Expand Up @@ -232,7 +232,7 @@ class BlazeServer final {
const ServerProcessInfo &ProcessInfo() const { return process_info_; }

private:
BlazeLock blaze_lock_;
std::optional<LockHandle> output_base_lock_;

enum CancelThreadAction { NOTHING, JOIN, CANCEL, COMMAND_ID_RECEIVED };

Expand All @@ -250,6 +250,7 @@ class BlazeServer final {
// actions from.
std::unique_ptr<blaze_util::IPipe> pipe_;

void ReleaseLock();
bool TryConnect(CommandServer::Stub *client);
void CancelThread();
void SendAction(CancelThreadAction action);
Expand All @@ -274,8 +275,28 @@ static BlazeServer *blaze_server;
// objects before those.

uint64_t BlazeServer::AcquireLock() {
return blaze::AcquireLock(output_base_, batch_, block_for_lock_,
&blaze_lock_);
if (output_base_lock_.has_value()) {
BAZEL_DIE(blaze_exit_code::INTERNAL_ERROR)
<< "AcquireLock() called but the lock is already held.";
}
// Take an exclusive lock on the output base, because two simultaneous
// commands may not run against the same output base. Note that this lock will
// be released by ReleaseLock() once the server is running, as it can handle
// concurrent clients on its own.
uint64_t wait_time;
output_base_lock_ = blaze::AcquireLock(
"output base", output_base_.GetRelative("lock"), LockMode::kExclusive,
batch_, block_for_lock_, &wait_time);
return wait_time;
}

void BlazeServer::ReleaseLock() {
if (!output_base_lock_.has_value()) {
BAZEL_DIE(blaze_exit_code::INTERNAL_ERROR)
<< "ReleaseLock() called without a lock to release.";
}
blaze::ReleaseLock(*output_base_lock_);
output_base_lock_ = std::nullopt;
}

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1857,12 +1878,12 @@ unsigned int BlazeServer::Communicate(
std::unique_ptr<grpc::ClientReader<command_server::RunResponse>> reader(
client_->Run(context.get(), request));

// Release the server lock because the gRPC handles concurrent clients just
// fine. Note that this may result in two "waiting for other client" messages
// (one during server startup and one emitted by the server)
BAZEL_LOG(INFO)
<< "Releasing client lock, let the server manage concurrent requests.";
blaze::ReleaseLock(&blaze_lock_);
// Release the client lock because the gRPC server handles concurrent clients
// just fine. Note that this may result in two "waiting for other client"
// messages (one during server startup and one emitted by the server).
BAZEL_LOG(INFO) << "Releasing the client lock, as the server can manage "
"concurrent clients on its own.";
ReleaseLock();

std::thread cancel_thread(&BlazeServer::CancelThread, this);
bool command_id_set = false;
Expand Down
32 changes: 17 additions & 15 deletions src/main/cpp/blaze_util_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,23 +198,25 @@ extern const char kListSeparator;
bool SymlinkDirectories(const std::string& target,
const blaze_util::Path& link);

struct BlazeLock {
#if defined(_WIN32) || defined(__CYGWIN__)
/* HANDLE */ void* handle;
#else
int lockfd;
#endif
};
typedef uintptr_t LockHandle;

// Acquires a lock on the output base. Exits if the lock cannot be acquired.
// Sets `blaze_lock` to a value that can be later passed to ReleaseLock().
// Returns the number of milliseconds spent with waiting for the lock.
uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock);
enum class LockMode {
kShared,
kExclusive,
};

// Releases the lock on the output base. In case of an error, continues as
// usual.
void ReleaseLock(BlazeLock* blaze_lock);
// Acquires a `mode` lock on `path`, busy-waiting until it becomes available if
// `block` is true, and releasing it on exec if `batch_mode` is false.
// Crashes if the lock cannot be acquired. Returns a handle that can be
// subsequently passed to ReleaseLock. Sets `wait_time` to the number of
// milliseconds spent waiting for the lock. The `name` argument is used to
// distinguish it from other locks in human-readable error messages.
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time);

// Releases a lock previously obtained from AcquireLock.
void ReleaseLock(LockHandle lock_handle);

// Verifies whether the server process still exists. Returns true if it does.
bool VerifyServerProcess(int pid, const blaze_util::Path& output_base);
Expand Down
87 changes: 49 additions & 38 deletions src/main/cpp/blaze_util_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ void SigPrintf(const char *format, ...) {
}
}

static int setlk(int fd, struct flock *lock) {
static int setlk(int fd, struct flock* lock) {
#ifdef __linux__
// If we're building with glibc <2.20, or another libc which predates
// OFD locks, define the constant ourselves. This assumes that the libc
Expand Down Expand Up @@ -634,34 +634,41 @@ static int setlk(int fd, struct flock *lock) {
return -1;
}

uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock) {
blaze_util::Path lockfile = output_base.GetRelative("lock");

int flags = O_CREAT | O_RDWR;
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time) {
int flags = O_CREAT;
switch (mode) {
case LockMode::kShared:
flags |= O_RDONLY;
break;
case LockMode::kExclusive:
flags |= O_RDWR;
break;
}
// Keep server from inheriting a useless fd if we are not in batch mode.
if (!batch_mode) {
flags |= O_CLOEXEC;
}

int lockfd = open(lockfile.AsNativePath().c_str(), flags, 0644);
if (lockfd < 0) {
int fd = open(path.AsNativePath().c_str(), flags, 0644);
if (fd < 0) {
string err = GetLastErrorString();
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "cannot open lockfile '" << lockfile.AsPrintablePath()
<< "' for writing: " << err;
<< "open failed for " << name << " lock: " << err;
}

struct flock lock = {};
lock.l_type = F_WRLCK;
lock.l_type = static_cast<short>( // NOLINT (short is the right type)
mode == LockMode::kShared ? F_RDLCK : F_WRLCK);
lock.l_whence = SEEK_SET;
lock.l_start = 0;
// This doesn't really matter now, but allows us to subdivide the lock
// later if that becomes meaningful. (Ranges beyond EOF can be locked.)
lock.l_len = 4096;

// Take the exclusive server lock. If we fail, we busy-wait until the lock
// becomes available.
// Take the lock. If it fails, busy-wait until it becomes available unless
// --noblock_for_lock was set.
//
// We used to rely on fcntl(F_SETLKW) to lazy-wait for the lock to become
// available, which is theoretically fine, but doing so prevents us from
Expand All @@ -673,18 +680,19 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool multiple_attempts = false;
string owner;
const uint64_t start_time = GetMillisecondsMonotonic();
while (setlk(lockfd, &lock) == -1) {
while (setlk(fd, &lock) == -1) {
string buffer(4096, 0);
ssize_t r = pread(lockfd, &buffer[0], buffer.size(), 0);
ssize_t r = pread(fd, &buffer[0], buffer.size(), 0);
if (r < 0) {
BAZEL_LOG(WARNING) << "pread() lock file: " << strerror(errno);
BAZEL_LOG(WARNING) << "pread() " << name << " lock: " << strerror(errno);
r = 0;
}
buffer.resize(r);
if (owner != buffer) {
// Each time we learn a new lock owner, print it out.
owner = buffer;
BAZEL_LOG(USER) << "Another command holds the client lock: \n" << owner;
BAZEL_LOG(USER) << "Another command holds the " << name << " lock: \n"
<< owner;
if (block) {
BAZEL_LOG(USER) << "Waiting for it to complete...";
fflush(stderr);
Expand All @@ -693,8 +701,8 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,

if (!block) {
BAZEL_DIE(blaze_exit_code::LOCK_HELD_NOBLOCK_FOR_LOCK)
<< "Exiting because the lock is held and --noblock_for_lock was "
"given.";
<< "Exiting because the " << name
<< " lock is held and --noblock_for_lock was given.";
}

TrySleep(500);
Expand All @@ -706,28 +714,31 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// avoid unnecessary noise in the logs. In this metric, we are only
// interested in knowing how long it took for other commands to complete, not
// how fast acquiring a lock is.
const uint64_t wait_time = !multiple_attempts ? 0 : end_time - start_time;
const uint64_t elapsed_time = !multiple_attempts ? 0 : end_time - start_time;

// Identify ourselves in the lockfile.
// If taking an exclusive lock, identify ourselves in the lockfile.
// The contents are printed for human consumption when another client
// fails to take the lock, but not parsed otherwise.
(void) ftruncate(lockfd, 0);
lseek(lockfd, 0, SEEK_SET);
// Arguably we should ensure this fits in the 4KB we lock. In practice no one
// will have a cwd long enough to overflow that, and nothing currently uses
// the rest of the lock file anyway.
dprintf(lockfd, "pid=%d\nowner=client\n", getpid());
string cwd = blaze_util::GetCwd();
dprintf(lockfd, "cwd=%s\n", cwd.c_str());
if (const char *tty = ttyname(STDIN_FILENO)) { // NOLINT (single-threaded)
dprintf(lockfd, "tty=%s\n", tty);
}
blaze_lock->lockfd = lockfd;
return wait_time;
}

void ReleaseLock(BlazeLock* blaze_lock) {
close(blaze_lock->lockfd);
if (mode == LockMode::kExclusive) {
(void)ftruncate(fd, 0);
lseek(fd, 0, SEEK_SET);
// Arguably we should ensure this fits in the 4KB we lock. In practice no
// one will have a cwd long enough to overflow that, and nothing currently
// uses the rest of the lock file anyway.
dprintf(fd, "pid=%d\nowner=client\n", getpid());
string cwd = blaze_util::GetCwd();
dprintf(fd, "cwd=%s\n", cwd.c_str());
if (const char* tty = ttyname(STDIN_FILENO)) { // NOLINT (single-threaded)
dprintf(fd, "tty=%s\n", tty);
}
}

*wait_time = elapsed_time;
return static_cast<LockHandle>(fd);
}

void ReleaseLock(LockHandle lock_handle) {
close(static_cast<int>(lock_handle));
}

bool KillServerProcess(int pid, const blaze_util::Path& output_base) {
Expand Down
58 changes: 35 additions & 23 deletions src/main/cpp/blaze_util_windows.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,36 +1085,44 @@ uint64_t WindowsClock::GetMilliseconds() const {
return GetMillisecondsAsLargeInt(kFrequency).QuadPart;
}

uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock) {
blaze_util::Path lockfile = output_base.GetRelative("lock");
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time) {
DWORD desired_access = GENERIC_READ;
if (mode == LockMode::kExclusive) {
desired_access |= GENERIC_WRITE;
}

// CreateFile defaults to opening the file exclusively. We intentionally open
// it in shared mode and instead rely on LockFileEx to obtain an exclusive
// lock, mimicking the behavior of FileChannel in the JVM, to make locks
// obtained on the client side compatible with the server side.
// it in shared mode and instead use LockFileEx to obtain a lock. This mimicks
// the FileChannel implementation in the JVM, making locks obtained on the
// client side compatible with the server side.
HANDLE handle = ::CreateFileW(
/* lpFileName */ lockfile.AsNativePath().c_str(),
/* dwDesiredAccess */ GENERIC_READ | GENERIC_WRITE,
/* lpFileName */ path.AsNativePath().c_str(),
/* dwDesiredAccess */ desired_access,
/* dwShareMode */ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
/* lpSecurityAttributes */ nullptr,
/* dwCreationDisposition */ CREATE_ALWAYS,
/* dwFlagsAndAttributes */ FILE_ATTRIBUTE_NORMAL,
/* hTemplateFile */ nullptr);
if (handle == INVALID_HANDLE_VALUE) {
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "Failed to CreateFile(" << lockfile.AsPrintablePath()
<< "): " << GetLastErrorString();
<< "CreateFile failed for " << name
<< " lock: " << GetLastErrorString();
}

bool first_lock_attempt = true;
uint64_t start_time = GetMillisecondsMonotonic();

while (true) {
OVERLAPPED overlapped = {};
DWORD flags = LOCKFILE_FAIL_IMMEDIATELY;
if (mode == LockMode::kExclusive) {
flags |= LOCKFILE_EXCLUSIVE_LOCK;
}
BOOL success = LockFileEx(
/* hFile */ handle,
/* dwFlags */ LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
/* dwFlags */ flags,
/* dwReserved */ 0,
/* nNumberOfBytesToLockLow */ 1,
/* nNumberOfBytesToLockHigh */ 0,
Expand All @@ -1129,22 +1137,22 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// See https://devblogs.microsoft.com/oldnewthing/20140905-00/?p=63.
if (GetLastError() != ERROR_LOCK_VIOLATION) {
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "Unexpected result from LockFileEx(" << lockfile.AsPrintablePath()
<< "):" << GetLastErrorString();
<< "LockFileEx failed for " << name
<< " lock: " << GetLastErrorString();
}
// Someone else has the lock.
if (first_lock_attempt) {
first_lock_attempt = false;
BAZEL_LOG(USER) << "Another command holds the client lock.";
BAZEL_LOG(USER) << "Another command holds the " << name << " lock.";
if (block) {
BAZEL_LOG(USER) << "Waiting for it to complete...";
}
fflush(stderr);
}
if (!block) {
BAZEL_DIE(blaze_exit_code::LOCK_HELD_NOBLOCK_FOR_LOCK)
<< "Exiting because the lock is held and --noblock_for_lock was "
"given.";
<< "Exiting because the " << name
<< " lock is held and --noblock_for_lock was given.";
}
Sleep(/* dwMilliseconds */ 500);
}
Expand All @@ -1154,17 +1162,21 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// a concurrent process can read and display it. On Windows we can't do so
// because locks are mandatory, thus we cannot read the file concurrently.

uint64_t wait_time = GetMillisecondsMonotonic() - start_time;
blaze_lock->handle = handle;
return wait_time;
*wait_time = GetMillisecondsMonotonic() - start_time;
return reinterpret_cast<LockHandle>(handle);
}

void ReleaseLock(BlazeLock* blaze_lock) {
void ReleaseLock(LockHandle lock_handle) {
HANDLE handle = reinterpret_cast<HANDLE>(lock_handle);
OVERLAPPED overlapped = {0};
UnlockFileEx(blaze_lock->handle, 0, 1, 0, &overlapped);
CloseHandle(blaze_lock->handle);
UnlockFileEx(
/* hFile */ handle,
/* dwReserved */ 0,
/* nNumberOfBytesToUnlockLow */ 1,
/* nNumberOfBytesToUnlockHigh */ 0,
/* lpOverlapped */ &overlapped);
CloseHandle(handle);
}

#ifdef GetUserName
// By including <windows.h>, we have GetUserName defined either as
// GetUserNameA or GetUserNameW.
Expand Down
Loading