Skip to content

Commit

Permalink
Merge pull request #189 from iuioiua/refactor-impl
Browse files Browse the repository at this point in the history
refactor: minor implementation simplifications
  • Loading branch information
iuioiua authored Dec 19, 2024
2 parents 8b19699 + 4f1a297 commit f83ddc4
Showing 1 changed file with 27 additions and 67 deletions.
94 changes: 27 additions & 67 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,47 +90,24 @@ async function writeCommand(
await writeAll(writer, createRequest(command));
}

const DELIM_LPS = new Uint8Array([0, 0]);

async function* readLines(reader: Reader): AsyncIterableIterator<Uint8Array> {
const buffer = new Uint8Array(1024);
let chunks = new Uint8Array();

// Modified KMP
let inspectIndex = 0;
let matchIndex = 0;
while (true) {
const inspectArr = new Uint8Array(1024);
const result = await reader.read(inspectArr);
if (result === null) {
// Yield last chunk.
yield chunks;
break;
}
chunks = concat([chunks, inspectArr.slice(0, result)]);
let localIndex = 0;
while (inspectIndex < chunks.length) {
if (inspectArr[localIndex] === CRLF[matchIndex]) {
inspectIndex++;
localIndex++;
matchIndex++;
if (matchIndex === DELIM_LPS.length) {
// Full match
const matchEnd = inspectIndex - DELIM_LPS.length;
const readyBytes = chunks.slice(0, matchEnd);
yield readyBytes;
// Reset match, different from KMP.
chunks = chunks.slice(inspectIndex);
inspectIndex = 0;
matchIndex = 0;
}
const result = await reader.read(buffer);
if (result === null) break;
chunks = concat([chunks, buffer.subarray(0, result)]);
let index;
while ((index = chunks.indexOf(CRLF[0])) !== -1) {
if (chunks[index + 1] === CRLF[1]) {
yield chunks.subarray(0, index);
chunks = chunks.subarray(index + 2);
} else {
if (matchIndex === 0) {
inspectIndex++;
localIndex++;
}
break;
}
}
}
yield chunks;
}

function readNReplies(
Expand Down Expand Up @@ -177,7 +154,7 @@ async function readReply(
return line === "t";
case BULK_STRING_PREFIX:
case VERBATIM_STRING_PREFIX: {
if (Number(line) === -1) {
if (line === "-1") {
return null;
}
const { value } = await iterator.next();
Expand Down Expand Up @@ -213,41 +190,15 @@ async function readReply(
}
}

async function sendCommand(
redisConn: Reader & Writer,
command: Command,
raw = false,
): Promise<Reply> {
await writeCommand(redisConn, command);
return readReply(readLines(redisConn), raw);
}

async function pipelineCommands(
redisConn: Reader & Writer,
commands: Command[],
): Promise<Reply[]> {
const bytes = commands.map(createRequest);
await writeAll(redisConn, concat(bytes));
return readNReplies(commands.length, readLines(redisConn));
}

async function* readReplies(
redisConn: Reader & Writer,
raw = false,
): AsyncIterableIterator<Reply> {
const iterator = readLines(redisConn);
while (true) {
yield await readReply(iterator, raw);
}
}

export class RedisClient {
#conn: Reader & Writer;
#lines: AsyncIterableIterator<Uint8Array>;
#queue: Promise<any> = Promise.resolve();

/** Constructs a new instance. */
constructor(conn: Reader & Writer) {
this.#conn = conn;
this.#lines = readLines(this.#conn);
}

#enqueue<T>(task: () => Promise<T>): Promise<T> {
Expand All @@ -273,7 +224,10 @@ export class RedisClient {
* ```
*/
sendCommand(command: Command, raw = false): Promise<Reply> {
return this.#enqueue(() => sendCommand(this.#conn, command, raw));
return this.#enqueue(async () => {
await writeCommand(this.#conn, command);
return readReply(this.#lines, raw);
});
}

/**
Expand Down Expand Up @@ -311,8 +265,10 @@ export class RedisClient {
* }
* ```
*/
readReplies(raw = false): AsyncIterableIterator<Reply> {
return readReplies(this.#conn, raw);
async *readReplies(raw = false): AsyncIterableIterator<Reply> {
while (true) {
yield await readReply(this.#lines, raw);
}
}

/**
Expand All @@ -335,6 +291,10 @@ export class RedisClient {
* ```
*/
pipelineCommands(commands: Command[]): Promise<Reply[]> {
return this.#enqueue(() => pipelineCommands(this.#conn, commands));
return this.#enqueue(async () => {
const bytes = concat(commands.map(createRequest));
await writeAll(this.#conn, bytes);
return readNReplies(commands.length, this.#lines);
});
}
}

0 comments on commit f83ddc4

Please sign in to comment.