Skip to content

Commit

Permalink
RELNOTES[INC]: Remove high priority workers functionality from blaze.
Browse files Browse the repository at this point in the history
We don't observe any profit in wall time from high priority workers right now in Goole internally. Moreover for fully-worker builds we see a wall time degradation.

PiperOrigin-RevId: 522309316
Change-Id: Ic5ebd3021d33abb043fca6f55edb0dc0468f4bef
  • Loading branch information
Googler authored and copybara-github committed Apr 6, 2023
1 parent 986863a commit 8e359e7
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,7 @@ boolean areResourcesAvailable(ResourceSet resources) throws NoSuchElementExcepti
availableWorkers = this.workerPool.getMaxTotalPerKey(workerKey);
activeWorkers = this.workerPool.getNumActive(workerKey);
}
boolean workerIsAvailable =
workerKey == null
|| (activeWorkers < availableWorkers && workerPool.couldBeBorrowed(workerKey));
boolean workerIsAvailable = workerKey == null || (activeWorkers < availableWorkers);

// We test for tracking of extra resources whenever acquired and throw an
// exception before acquiring any untracked resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.devtools.common.options.OptionMetadataTag;
import com.google.devtools.common.options.OptionsBase;
import java.io.IOException;
import java.util.List;

/** Module implementing the rule set of Bazel. */
public final class BazelRulesModule extends BlazeModule {
Expand Down Expand Up @@ -289,6 +290,15 @@ public static class BuildGraveyardOptions extends OptionsBase {
effectTags = {OptionEffectTag.NO_OP},
help = "No-op.")
public boolean parseHeadersSkippedIfCorrespondingSrcsFound;

@Option(
name = "high_priority_workers",
defaultValue = "null",
documentationCategory = OptionDocumentationCategory.EXECUTION_STRATEGY,
effectTags = {OptionEffectTag.EXECUTION},
help = "No-op, will be removed soon.",
allowMultiple = true)
public List<String> highPriorityWorkers;
}

/** This is where deprecated Bazel-specific options only used by the build command go to die. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ public void buildStarting(BuildStartingEvent event) {

WorkerPoolConfig newConfig =
new WorkerPoolConfig(
workerFactory,
options.workerMaxInstances,
options.workerMaxMultiplexInstances,
options.highPriorityWorkers);
workerFactory, options.workerMaxInstances, options.workerMaxMultiplexInstances);

// If the config changed compared to the last run, we have to create a new pool.
if (workerPool == null || !newConfig.equals(workerPool.getWorkerPoolConfig())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,6 @@ public String getTypeDescription() {
allowMultiple = true)
public List<Map.Entry<String, Integer>> workerMaxMultiplexInstances;

@Option(
name = "high_priority_workers",
defaultValue = "null",
documentationCategory = OptionDocumentationCategory.EXECUTION_STRATEGY,
effectTags = {OptionEffectTag.EXECUTION},
help =
"Mnemonics of workers to run with high priority. When high priority workers are running "
+ "all other workers are throttled.",
allowMultiple = true)
public List<String> highPriorityWorkers;

@Option(
name = "worker_quit_after_build",
defaultValue = "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public interface WorkerPool {

Worker borrowObject(WorkerKey key) throws IOException, InterruptedException;

boolean couldBeBorrowed(WorkerKey key);

void returnObject(WorkerKey key, Worker obj);

void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.pool2.impl.EvictionPolicy;
Expand All @@ -37,15 +36,6 @@ public class WorkerPoolImpl implements WorkerPool {
/** Unless otherwise specified, the max number of multiplex workers per WorkerKey. */
private static final int DEFAULT_MAX_MULTIPLEX_WORKERS = 8;

private static final int MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS = 1;
/**
* How many high-priority workers are currently borrowed. If greater than one, low-priority
* workers cannot be borrowed until the high-priority ones are done.
*/
private final AtomicInteger highPriorityWorkersInUse = new AtomicInteger(0);
/** Which mnemonics create high-priority workers. */
private final ImmutableSet<String> highPriorityWorkerMnemonics;

private final WorkerPoolConfig workerPoolConfig;
/** Map of singleplex worker pools, one per mnemonic. */
private final ImmutableMap<String, SimpleWorkerPool> workerPools;
Expand All @@ -58,9 +48,6 @@ public class WorkerPoolImpl implements WorkerPool {
public WorkerPoolImpl(WorkerPoolConfig workerPoolConfig) {
this.workerPoolConfig = workerPoolConfig;

highPriorityWorkerMnemonics =
ImmutableSet.copyOf((Iterable<String>) workerPoolConfig.getHighPriorityWorkers());

ImmutableMap<String, Integer> config =
createConfigFromOptions(workerPoolConfig.getWorkerMaxInstances(), DEFAULT_MAX_WORKERS);
ImmutableMap<String, Integer> multiplexConfig =
Expand Down Expand Up @@ -143,8 +130,7 @@ public void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy) throws Interr
}

/**
* Gets a worker. May block indefinitely if too many high-priority workers are in use and the
* requested worker is not high priority.
* Gets a worker from worker pool. Could wait if no idle workers are available.
*
* @param key worker key
* @return a worker
Expand All @@ -158,44 +144,11 @@ public Worker borrowObject(WorkerKey key) throws IOException, InterruptedExcepti
Throwables.propagateIfPossible(t, IOException.class, InterruptedException.class);
throw new RuntimeException("unexpected", t);
}

// TODO(b/244297036): move highPriorityWorkerMnemonics logic to the ResourceManager.
if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
highPriorityWorkersInUse.incrementAndGet();
} else {
try {
waitForHighPriorityWorkersToFinish();
} catch (InterruptedException e) {
returnObject(key, result);
throw e;
}
}

return result;
}

/**
* Checks if there is no blockers from high priority workers to take new worker with this worker
* key. Doesn't check occupancy of worker pool for this mnemonic.
*/
@Override
public boolean couldBeBorrowed(WorkerKey key) {
if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
return true;
}

if (highPriorityWorkersInUse.get() <= MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS) {
return true;
}

return false;
}

@Override
public void returnObject(WorkerKey key, Worker obj) {
if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
decrementHighPriorityWorkerCount();
}
if (doomedWorkers.contains(obj.getWorkerId())) {
obj.setDoomed(true);
}
Expand All @@ -204,9 +157,6 @@ public void returnObject(WorkerKey key, Worker obj) {

@Override
public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException {
if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
decrementHighPriorityWorkerCount();
}
if (doomedWorkers.contains(obj.getWorkerId())) {
obj.setDoomed(true);
}
Expand All @@ -218,29 +168,6 @@ public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedExcept
}
}

// Decrements the high-priority workers counts and pings waiting threads if appropriate.
private void decrementHighPriorityWorkerCount() {
if (highPriorityWorkersInUse.decrementAndGet() <= MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS) {
synchronized (highPriorityWorkersInUse) {
highPriorityWorkersInUse.notifyAll();
}
}
}

// Returns once less than two high-priority workers are running.
private void waitForHighPriorityWorkersToFinish() throws InterruptedException {
// Fast path for the case where the high-priority workers feature is not in use.
if (highPriorityWorkerMnemonics.isEmpty()) {
return;
}

while (highPriorityWorkersInUse.get() > MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS) {
synchronized (highPriorityWorkersInUse) {
highPriorityWorkersInUse.wait();
}
}
}

@Override
public synchronized void setDoomedWorkers(ImmutableSet<Integer> workerIds) {
this.doomedWorkers = workerIds;
Expand Down Expand Up @@ -280,17 +207,14 @@ public static class WorkerPoolConfig {
private final WorkerFactory workerFactory;
private final List<Entry<String, Integer>> workerMaxInstances;
private final List<Entry<String, Integer>> workerMaxMultiplexInstances;
private final List<String> highPriorityWorkers;

public WorkerPoolConfig(
WorkerFactory workerFactory,
List<Entry<String, Integer>> workerMaxInstances,
List<Entry<String, Integer>> workerMaxMultiplexInstances,
List<String> highPriorityWorkers) {
List<Entry<String, Integer>> workerMaxMultiplexInstances) {
this.workerFactory = workerFactory;
this.workerMaxInstances = workerMaxInstances;
this.workerMaxMultiplexInstances = workerMaxMultiplexInstances;
this.highPriorityWorkers = highPriorityWorkers;
}

public WorkerFactory getWorkerFactory() {
Expand All @@ -305,10 +229,6 @@ public List<Entry<String, Integer>> getWorkerMaxMultiplexInstances() {
return workerMaxMultiplexInstances;
}

public List<String> getHighPriorityWorkers() {
return highPriorityWorkers;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -320,14 +240,12 @@ public boolean equals(Object o) {
WorkerPoolConfig that = (WorkerPoolConfig) o;
return workerFactory.equals(that.workerFactory)
&& workerMaxInstances.equals(that.workerMaxInstances)
&& workerMaxMultiplexInstances.equals(that.workerMaxMultiplexInstances)
&& highPriorityWorkers.equals(that.highPriorityWorkers);
&& workerMaxMultiplexInstances.equals(that.workerMaxMultiplexInstances);
}

@Override
public int hashCode() {
return Objects.hash(
workerFactory, workerMaxInstances, workerMaxMultiplexInstances, highPriorityWorkers);
return Objects.hash(workerFactory, workerMaxInstances, workerMaxMultiplexInstances);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import com.google.devtools.build.lib.worker.WorkerPoolImpl;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -101,7 +99,6 @@ public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
}
},
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of()));
}

Expand Down Expand Up @@ -759,101 +756,6 @@ public void testAcquireWithWorker_acquireAndRelease() throws Exception {
assertThat(rm.inUse()).isFalse();
}

@Test
public void testReleaseWorker_highPriorityWorker() throws Exception {

String slowMenmonic = "SLOW";
String fastMenmonic = "FAST";

Worker slowWorker1 = mock(Worker.class);
Worker slowWorker2 = mock(Worker.class);
Worker fastWorker = mock(Worker.class);

WorkerKey slowWorkerKey = createWorkerKey(slowMenmonic);
WorkerKey fastWorkerKey = createWorkerKey(fastMenmonic);

when(slowWorker1.getWorkerKey()).thenReturn(slowWorkerKey);
when(slowWorker2.getWorkerKey()).thenReturn(slowWorkerKey);
when(fastWorker.getWorkerKey()).thenReturn(fastWorkerKey);

CountDownLatch slowLatch = new CountDownLatch(2);
CountDownLatch fastLatch = new CountDownLatch(1);

WorkerPoolImpl workerPool =
new WorkerPoolImpl(
new WorkerPoolImpl.WorkerPoolConfig(
new WorkerFactory(fs.getPath("/workerBase")) {
int numOfSlowWorkers = 0;

@Override
public Worker create(WorkerKey key) {
assertThat(key.getMnemonic()).isAnyOf(slowMenmonic, fastMenmonic);

if (key.getMnemonic().equals(fastMenmonic)) {
return fastWorker;
}

assertThat(numOfSlowWorkers).isLessThan(2);

if (numOfSlowWorkers == 0) {
numOfSlowWorkers++;
return slowWorker1;
}

numOfSlowWorkers++;
return slowWorker2;
}

@Override
public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
return true;
}
},
ImmutableList.of(),
ImmutableList.of(),
/* highPriorityWorkers= */ ImmutableList.of(slowMenmonic)));
rm.setWorkerPool(workerPool);

TestThread slowThread1 =
new TestThread(
() -> {
ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic);
slowLatch.countDown();
fastLatch.await();
// release resources
handle.close();
});

TestThread slowThread2 =
new TestThread(
() -> {
ResourceHandle handle = acquire(100, 0.1, 0, slowMenmonic);
slowLatch.countDown();
fastLatch.await();
// release resources
handle.close();
});

TestThread fastThread =
new TestThread(
() -> {
slowLatch.await();
assertThat(isAvailable(rm, 100, 0.1, 0, createWorkerKey(fastMenmonic))).isFalse();
fastLatch.countDown();
ResourceHandle handle = acquire(100, 0.1, 0, fastMenmonic);
// release resources
handle.close();
});

slowThread1.start();
slowThread2.start();
fastThread.start();

slowThread1.joinAndAssertState(Duration.ofSeconds(10).toMillis());
slowThread2.joinAndAssertState(Duration.ofSeconds(10).toMillis());
fastThread.joinAndAssertState(Duration.ofSeconds(10).toMillis());
}

synchronized boolean isAvailable(ResourceManager rm, double ram, double cpu, int localTestCount) {
return rm.areResourcesAvailable(ResourceSet.create(ram, cpu, localTestCount));
}
Expand Down
Loading

0 comments on commit 8e359e7

Please sign in to comment.