Skip to content

Commit

Permalink
add refreshTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
AmrSaber committed Dec 7, 2024
1 parent 9ca7787 commit cb6c190
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
9 changes: 7 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ A dedicated subscriber is created and managed in the background to manage subscr

Only one subscriber is created at a time. If the client stops and reconnects for whatever reason, then subscriber will stop with it and will reconnect on next lock use.

### Refresh lock timeout
At any point when using the lock if you need more time before the lock expires you can call `await release.refreshTimeout()` to reset the lock's timeout. e.g. if lock timeout was 5 seconds, and after 4 seconds you realize that you need more time to finish the task and you call `refreshTimeout` then lock timeout is reset to 5 seconds again.

If a process holds a lock and it is released or expired then that process calling `refreshTimeout` has no effect. Same thing if lock was not acquired in the first place (with `tryLock`) then `refreshTimeout` will have no effect.

### Fencing Token
A fencing token is an increasing number that is used to identify the order at which locks are acquired, and is used for further safety with writes in distributed systems. See "Making the lock safe with fencing" section from [this article](https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html) for more info about fencing tokens.

If the lock is successfully acquired then a fencing token is sure to be assigned, otherwise no fencing token will be issued if the lock is not acquired.
If the lock is successfully acquired then a fencing token is issued, otherwise no fencing token will be issued or assigned if the lock is not acquired.

Fencing tokens can be access from `release` function like `release.fencingToken`, it is undefined only if lock was not acquired.
Fencing tokens can be access from `release` function like `release.fencingToken`, it is -1 only if lock was not acquired.

Fencing tokens are global across all locks issued and not scoped with lock name. Application logic should only depend on the fencing token increasing and not care about the exact value of the token.

Expand Down
18 changes: 16 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type RedisClient = RedisClientType<any, any, any> | RedisClusterType<any, any, a
type ReleaseCallbackFn = () => void;
type ReleaseCallback = { lockKey: string; callback: ReleaseCallbackFn };

export type ReleaseFunc = (() => Promise<void>) & { fencingToken?: number };
export type ReleaseFunc = (() => Promise<void>) & { fencingToken: number; refreshTimeout: () => Promise<void> };
export type TryLockOptions = { timeout?: number };
export type LockOptions = TryLockOptions & {
pollingInterval?: number;
Expand Down Expand Up @@ -129,7 +129,13 @@ export async function tryLock(
PX: timeout,
});

if (result != REDIS_OK) return [false, async () => {}];
if (result != REDIS_OK) {
const dummyRelease: ReleaseFunc = () => Promise.resolve();
dummyRelease.refreshTimeout = () => Promise.resolve();
dummyRelease.fencingToken = -1;

return [false, dummyRelease];
}

let released = false;
const release: ReleaseFunc = async function () {
Expand Down Expand Up @@ -163,6 +169,14 @@ export async function tryLock(
};

release.fencingToken = await redis.incr(REDIS_FENCING_TOKENS_COUNTER);
release.refreshTimeout = async () => {
if (released || (await redis.get(lockKey)) != lockValue) {
released = true;
return; // Check if lock is released
}

await redis.pExpire(lockKey, timeout);
};

return [true, release];
}
Expand Down
41 changes: 33 additions & 8 deletions unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ describe('Lock tests', () => {
[hasLock, release] = await tryLock(redis, lockName);
expect(hasLock).toEqual(false);

expect(release.fencingToken).not.toBeDefined();
expect(release.fencingToken).toEqual(-1);
});

test('it issues monotonic fencing tokens', async () => {
let lastToken: number | null = null;

for (let i = 0; i < 10; i++) {
for (let i = 0; i < 25; i++) {
let [hasLock, release] = await tryLock(redis, lockName);
expect(hasLock).toEqual(true);
await release();

if (lastToken != null) expect(lastToken).toBeLessThan(release.fencingToken!);
lastToken = release.fencingToken!;
if (lastToken != null) expect(lastToken).toBeLessThan(release.fencingToken);
lastToken = release.fencingToken;
}
});

Expand Down Expand Up @@ -88,15 +88,24 @@ describe('Lock tests', () => {
});

test('lock expiration', async () => {
let [hasLock] = await tryLock(redis, lockName, { timeout: 50 });
const options: TryLockOptions = { timeout: 25 };

let [hasLock, release] = await tryLock(redis, lockName, options);
expect(hasLock).toEqual(true);

[hasLock] = await tryLock(redis, lockName);
[hasLock] = await tryLock(redis, lockName, options);
expect(hasLock).toEqual(false);

await sleep(55);
await sleep(30);

[hasLock] = await tryLock(redis, lockName);
[hasLock] = await tryLock(redis, lockName, options);
expect(hasLock).toEqual(true);

await sleep(10);
await release.refreshTimeout(); // should has no effect
await sleep(20);

[hasLock] = await tryLock(redis, lockName, options);
expect(hasLock).toEqual(true);
});

Expand Down Expand Up @@ -126,6 +135,22 @@ describe('Lock tests', () => {
expect(hasLock).toEqual(true);
await release();
});

test('refresh expire', async () => {
const [, release] = await tryLock(redis, lockName, { timeout: 50 });

await sleep(35);
await release.refreshTimeout();

let [hasLock] = await tryLock(redis, lockName);
expect(hasLock).toEqual(false);

await sleep(35);
await release.refreshTimeout();

[hasLock] = await tryLock(redis, lockName);
expect(hasLock).toEqual(false);
});
});

describe('lock', () => {
Expand Down

0 comments on commit cb6c190

Please sign in to comment.