Skip to content

Commit

Permalink
refactor!(daemon): Rename mutex to serial-jobs (merge #2131)
Browse files Browse the repository at this point in the history
Renames the daemon's `Mutex` to `SerialJobs`, which better reflects its purpose. Also removes `lock()` and `unlock()` from its interface.
  • Loading branch information
rekmarks authored Mar 12, 2024
2 parents ad92aab + d7951b0 commit 5186a50
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 40 deletions.
10 changes: 5 additions & 5 deletions packages/daemon/src/daemon-node-powers.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { makeReaderRef } from './reader-ref.js';
import { makePetStoreMaker } from './pet-store.js';
import { servePrivatePortHttp } from './serve-private-port-http.js';
import { servePrivatePath } from './serve-private-path.js';
import { makeMutex } from './mutex.js';
import { makeSerialJobs } from './serial-jobs.js';

const { quote: q } = assert;

Expand Down Expand Up @@ -355,7 +355,7 @@ export const makeNetworkPowers = ({ http, ws, net }) => {
};

export const makeFilePowers = ({ fs, path: fspath }) => {
const writeLock = makeMutex();
const writeJobs = makeSerialJobs();

/**
* @param {string} path
Expand All @@ -379,7 +379,7 @@ export const makeFilePowers = ({ fs, path: fspath }) => {
* @param {string} text
*/
const writeFileText = async (path, text) => {
await writeLock.enqueue(async () => {
await writeJobs.enqueue(async () => {
await fs.promises.writeFile(path, text);
});
};
Expand Down Expand Up @@ -423,13 +423,13 @@ export const makeFilePowers = ({ fs, path: fspath }) => {
* @param {string} path
*/
const removePath = async path => {
await writeLock.enqueue(async () => {
await writeJobs.enqueue(async () => {
return fs.promises.rm(path);
});
};

const renamePath = async (source, target) => {
await writeLock.enqueue(async () => {
await writeJobs.enqueue(async () => {
return fs.promises.rename(source, target);
});
};
Expand Down
24 changes: 16 additions & 8 deletions packages/daemon/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
parseFormulaIdentifier,
serializeFormulaIdentifier,
} from './formula-identifier.js';
import { makeMutex } from './mutex.js';
import { makeSerialJobs } from './serial-jobs.js';
import { makeWeakMultimap } from './weak-multimap.js';
import { makeLoopbackNetwork } from './networks/loopback.js';

Expand Down Expand Up @@ -101,7 +101,15 @@ const makeDaemonCore = async (
} = powers;
const { randomHex512 } = cryptoPowers;
const contentStore = persistencePowers.makeContentSha512Store();
const formulaGraphMutex = makeMutex();
/**
* Mutations of the formula graph must be serialized through this queue.
* "Mutations" include:
* - Creation
* - Removal
* - Incarnation
* - Cancellation
*/
const formulaGraphJobs = makeSerialJobs();
// This is the id of the node that is hosting the values.
// This will likely get replaced with a public key in the future.
const ownNodeIdentifier = getDerivedId(
Expand Down Expand Up @@ -835,7 +843,7 @@ const makeDaemonCore = async (

/** @type {import('./types.js').DaemonCore['cancelValue']} */
const cancelValue = async (formulaIdentifier, reason) => {
await formulaGraphMutex.enqueue();
await formulaGraphJobs.enqueue();
const controller = provideControllerForFormulaIdentifier(formulaIdentifier);
console.log('Cancelled:');
return controller.context.cancel(reason);
Expand Down Expand Up @@ -983,7 +991,7 @@ const makeDaemonCore = async (
* @type {import('./types.js').DaemonCore['incarnateWorker']}
*/
const incarnateWorker = async () => {
const formulaNumber = await formulaGraphMutex.enqueue(randomHex512);
const formulaNumber = await formulaGraphJobs.enqueue(randomHex512);
return incarnateNumberedWorker(formulaNumber);
};

Expand Down Expand Up @@ -1068,7 +1076,7 @@ const makeDaemonCore = async (
/** @type {import('./types.js').DaemonCore['incarnateGuest']} */
const incarnateGuest = async (hostFormulaIdentifier, deferredTasks) => {
return incarnateNumberedGuest(
await formulaGraphMutex.enqueue(async () => {
await formulaGraphJobs.enqueue(async () => {
const identifiers = await incarnateGuestDependencies(
hostFormulaIdentifier,
);
Expand Down Expand Up @@ -1116,7 +1124,7 @@ const makeDaemonCore = async (
workerFormulaIdentifier,
endowmentFormulaIdentifiers,
evalFormulaNumber,
} = await formulaGraphMutex.enqueue(async () => {
} = await formulaGraphJobs.enqueue(async () => {
const ownFormulaNumber = await randomHex512();
const ownFormulaIdentifier = serializeFormulaIdentifier({
type: 'eval',
Expand Down Expand Up @@ -1261,7 +1269,7 @@ const makeDaemonCore = async (
powersFormulaIdentifier,
capletFormulaNumber,
workerFormulaIdentifier,
} = await formulaGraphMutex.enqueue(() =>
} = await formulaGraphJobs.enqueue(() =>
incarnateCapletDependencies(
'make-unconfined',
hostFormulaIdentifier,
Expand Down Expand Up @@ -1297,7 +1305,7 @@ const makeDaemonCore = async (
powersFormulaIdentifier,
capletFormulaNumber,
workerFormulaIdentifier,
} = await formulaGraphMutex.enqueue(() =>
} = await formulaGraphJobs.enqueue(() =>
incarnateCapletDependencies(
'make-bundle',
hostFormulaIdentifier,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { makeQueue } from '@endo/stream';

/**
* @returns {import('./types.js').Mutex}
* @returns {import('./types.js').SerialJobs}
*/
export const makeMutex = () => {
export const makeSerialJobs = () => {
/** @type {import('@endo/stream').AsyncQueue<void>} */
const queue = makeQueue();
const lock = () => {
Expand All @@ -15,8 +15,6 @@ export const makeMutex = () => {
unlock();

return {
lock,
unlock,
enqueue: async (asyncFn = /** @type {any} */ (async () => {})) => {
await lock();
try {
Expand Down
4 changes: 1 addition & 3 deletions packages/daemon/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -813,9 +813,7 @@ export interface DaemonCore {
makeDirectoryNode: MakeDirectoryNode;
}

export type Mutex = {
lock: () => Promise<void>;
unlock: () => void;
export type SerialJobs = {
enqueue: <T>(asyncFn?: () => Promise<T>) => Promise<T>;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,64 @@ import '@endo/init/debug.js';
import rawTest from 'ava';
import { wrapTest } from '@endo/ses-ava';

import { makeMutex } from '../src/mutex.js';
import { makeSerialJobs } from '../src/serial-jobs.js';

const test = wrapTest(rawTest);

const delay = () => new Promise(resolve => setTimeout(resolve, 1));

test('releases lock in expected order (sync functions)', async t => {
const mutex = makeMutex();
test('performs operations in expected order (sync functions)', async t => {
const serialJobs = makeSerialJobs();
const results = [];

await Promise.all([
mutex.enqueue(() => {
serialJobs.enqueue(() => {
results.push(1);
}),
mutex.enqueue(() => {
serialJobs.enqueue(() => {
results.push(2);
}),
mutex.enqueue(() => {
serialJobs.enqueue(() => {
results.push(3);
}),
]);

t.deepEqual(results, [1, 2, 3]);
});

test('releases lock in expected order (async functions)', async t => {
const mutex = makeMutex();
test('performs operations in expected order (async functions)', async t => {
const serialJobs = makeSerialJobs();
const results = [];

await Promise.all([
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
results.push(1);
}),
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
results.push(2);
}),
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
results.push(3);
}),
]);

t.deepEqual(results, [1, 2, 3]);
});

test('releases lock in expected order (async functions with await)', async t => {
const mutex = makeMutex();
test('performs operations in expected order (async functions with await)', async t => {
const serialJobs = makeSerialJobs();
const results = [];

await Promise.all([
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
await delay();
results.push(1);
}),
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
await delay();
results.push(2);
}),
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
await delay();
results.push(3);
}),
Expand All @@ -71,20 +71,20 @@ test('releases lock in expected order (async functions with await)', async t =>
});

test('immediately releases the lock to the awaiter', async t => {
const mutex = makeMutex();
const serialJobs = makeSerialJobs();
const results = [];

await Promise.all([
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
await delay();
results.push(2);
}),
(async () => {
results.push(1);
await mutex.enqueue(() => delay());
await serialJobs.enqueue(() => delay());
results.push(3);
})(),
mutex.enqueue(async () => {
serialJobs.enqueue(async () => {
results.push(4);
}),
]);
Expand Down

0 comments on commit 5186a50

Please sign in to comment.