Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Handle commands with binary output in transaction #2133

Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added handlind commands with binary output in transactions ([#2133](https://github.com/valkey-io/valkey-glide/pull/2133))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Node: Added handlind commands with binary output in transactions ([#2133](https://github.com/valkey-io/valkey-glide/pull/2133))
* Node: Added handling for commands with binary output in transactions ([#2133](https://github.com/valkey-io/valkey-glide/pull/2133))

* Node: Added FUNCTION KILL command ([#2114](https://github.com/valkey-io/valkey-glide/pull/2114))
* Node: Added XAUTOCLAIM command ([#2108](https://github.com/valkey-io/valkey-glide/pull/2108))
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
Expand Down
4 changes: 2 additions & 2 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ export const enum Decoder {
/**
* Decodes the response into a buffer array.
*/
Bytes,
Bytes = "binary",
/**
* Decodes the response into a string.
*/
String,
String = "string",
}

/**
Expand Down
46 changes: 33 additions & 13 deletions node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
createTime,
createUnWatch,
} from "./Commands";
import { RequestError } from "./Errors";
import { connection_request } from "./ProtobufMessage";
import { Transaction } from "./Transaction";

Expand Down Expand Up @@ -169,29 +170,48 @@ export class GlideClient extends BaseClient {
);
}

/** Execute a transaction by processing the queued commands.
* See https://redis.io/topics/Transactions/ for details on Redis Transactions.
/**
* Execute a transaction by processing the queued commands.
*
* @see {@link https://valkey.io/topics/transactions/} for details on Valkey Transactions.
*
* @param transaction - A Transaction object containing a list of commands to be executed.
* @param decoder - An optional parameter to decode all commands in the transaction. If not set, 'Decoder.String' will be used.
* @param transaction - A {@link Transaction} object containing a list of commands to be executed.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response.
* If not set, the {@link BaseClientConfiguration.defaultDecoder|default decoder} will be used
* or {@link Decoder.Bytes} if there are commands which produce binary output.
* @returns A list of results corresponding to the execution of each command in the transaction.
* If a command returns a value, it will be included in the list. If a command doesn't return a value,
* the list entry will be null.
* If the transaction failed due to a WATCH command, `exec` will return `null`.
* If a command returns a value, it will be included in the list. If a command doesn't return a value,
* the list entry will be `null`.
* If the transaction failed due to a `WATCH` command, `exec` will return `null`.
*/
public exec(
public async exec(
transaction: Transaction,
decoder: Decoder = this.defaultDecoder,
decoder?: Decoder,
): Promise<ReturnType[] | null> {
if (
decoder &&
decoder != Decoder.Bytes &&
transaction.requiresBinaryDecorer
) {
throw new RequestError(
"Transaction has a command which requres `Decoder.Bytes`.",
);
}

decoder =
decoder ??
(transaction.requiresBinaryDecorer
? Decoder.Bytes
: this.defaultDecoder);
return this.createWritePromise<ReturnType[] | null>(
transaction.commands,
{ decoder: decoder },
).then((result: ReturnType[] | null) => {
return this.processResultWithSetCommands(
).then((result) =>
this.processResultWithSetCommands(
result,
transaction.setCommandsIndexes,
);
});
),
);
}

/** Executes a single command, without checking inputs. Every part of the command, including subcommands,
Expand Down
50 changes: 35 additions & 15 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,37 +367,57 @@ export class GlideClusterClient extends BaseClient {
});
}

/** Execute a transaction by processing the queued commands.
* See https://redis.io/topics/Transactions/ for details on Redis Transactions.
/**
* Execute a transaction by processing the queued commands.
*
* @see {@link https://valkey.io/topics/transactions/} for details on Valkey Transactions.
*
* @param transaction - A ClusterTransaction object containing a list of commands to be executed.
* @param route - If `route` is not provided, the transaction will be routed to the slot owner of the first key found in the transaction.
* If no key is found, the command will be sent to a random node.
* If `route` is provided, the client will route the command to the nodes defined by `route`.
* @param transaction - A {@link ClusterTransaction} object containing a list of commands to be executed.
* @param route - (Optional) If `route` is not provided, the transaction will be routed to the slot owner of the first key found in the transaction.
* If no key is found, the command will be sent to a random node.
* If `route` is provided, the client will route the command to the nodes defined by `route`.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response.
* If not set, the {@link BaseClientConfiguration.defaultDecoder|default decoder} will be used
* or {@link Decoder.Bytes} if there are commands which produce binary output.
* @returns A list of results corresponding to the execution of each command in the transaction.
* If a command returns a value, it will be included in the list. If a command doesn't return a value,
* the list entry will be null.
* If the transaction failed due to a WATCH command, `exec` will return `null`.
* If a command returns a value, it will be included in the list. If a command doesn't return a value,
* the list entry will be `null`.
* If the transaction failed due to a `WATCH` command, `exec` will return `null`.
*/
public exec(
public async exec(
transaction: ClusterTransaction,
options?: {
route?: SingleNodeRoute;
decoder?: Decoder;
},
): Promise<ReturnType[] | null> {
if (
options &&
options?.decoder != Decoder.Bytes &&
transaction.requiresBinaryDecorer
) {
throw new RequestError(
"Transaction has a command which requres `Decoder.Bytes`.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Transaction has a command which requres `Decoder.Bytes`.",
"Transaction has a command which requires `Decoder.Bytes`.",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

"Pass decoder=Decoder.Bytes as part of the exec() command to process transactions that return Bytes."

);
}

const decoder =
options?.decoder ??
(transaction.requiresBinaryDecorer
? Decoder.Bytes
: this.defaultDecoder);
return this.createWritePromise<ReturnType[] | null>(
transaction.commands,
{
route: toProtobufRoute(options?.route),
decoder: options?.decoder,
decoder: decoder,
},
).then((result: ReturnType[] | null) => {
return this.processResultWithSetCommands(
).then((result) =>
this.processResultWithSetCommands(
result,
transaction.setCommandsIndexes,
);
});
),
);
}

/** Ping the Redis server.
Expand Down
6 changes: 6 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
*/
readonly setCommandsIndexes: number[] = [];

/**
* Flag to be set to `true` by a command if it requires {@link Decoder.Bytes}.
* @internal
*/
requiresBinaryDecorer: boolean = false;

/**
* Adds a command to the transaction and returns the transaction instance.
* @param command - The command to add.
Expand Down
4 changes: 1 addition & 3 deletions node/tests/AsyncClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ const FreePort = require("find-free-port");

const PORT_NUMBER = 4000;

type EmptyObject = Record<string, never>;

describe("AsyncClient", () => {
let server: RedisServer;
let port: number;
Expand Down Expand Up @@ -41,7 +39,7 @@ describe("AsyncClient", () => {
server.close();
});

runCommonTests<EmptyObject>({
runCommonTests({
init: async () => {
const client = await AsyncClient.CreateConnection(
"redis://localhost:" + port,
Expand Down
34 changes: 20 additions & 14 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { BufferReader, BufferWriter } from "protobufjs";
import { v4 as uuidv4 } from "uuid";
import {
BaseClientConfiguration,
Decoder,
GlideClient,
ListDirection,
Expand Down Expand Up @@ -41,12 +42,6 @@ import {
waitForNotBusy,
} from "./TestUtilities";

/* eslint-disable @typescript-eslint/no-var-requires */

type Context = {
client: GlideClient;
};

const TIMEOUT = 50000;

describe("GlideClient", () => {
Expand Down Expand Up @@ -198,6 +193,7 @@ describe("GlideClient", () => {
expect(await client.dbsize()).toBeGreaterThan(0);
expect(await client.flushdb(FlushMode.SYNC)).toEqual("OK");
expect(await client.dbsize()).toEqual(0);
client.close();
},
);

Expand All @@ -221,6 +217,7 @@ describe("GlideClient", () => {
expect(await client.get(key)).toEqual(valueEncoded);
expect(await client.get(key, Decoder.String)).toEqual(value);
expect(await client.get(key, Decoder.Bytes)).toEqual(valueEncoded);
client.close();
},
);

Expand All @@ -244,6 +241,7 @@ describe("GlideClient", () => {
expect(await client.get(key)).toEqual(value);
expect(await client.get(key, Decoder.String)).toEqual(value);
expect(await client.get(key, Decoder.Bytes)).toEqual(valueEncoded);
client.close();
},
);

Expand All @@ -263,6 +261,7 @@ describe("GlideClient", () => {
expectedRes.push(["select(0)", "OK"]);

validateTransactionResponse(result, expectedRes);
client.close();
},
);

Expand All @@ -279,6 +278,7 @@ describe("GlideClient", () => {
expectedRes.push(["select(0)", "OK"]);

validateTransactionResponse(result, expectedRes);
client.close();
},
);

Expand All @@ -302,6 +302,7 @@ describe("GlideClient", () => {
expectedRes.push(["select(0)", "OK"]);

validateTransactionResponse(result, expectedRes);
client.close();
},
);

Expand All @@ -326,6 +327,7 @@ describe("GlideClient", () => {
expectedRes.push(["select(0)", "OK"]);

validateTransactionResponse(result, expectedRes);
client.close();
},
);

Expand Down Expand Up @@ -1396,19 +1398,23 @@ describe("GlideClient", () => {
TIMEOUT,
);

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
runBaseTests({
init: async (protocol, configOverrides) => {
const config = getClientConfigurationOption(
cluster.getAddresses(),
protocol,
);
options.protocol = protocol;
options.clientName = clientName;

for (const key of Object.keys(configOverrides ?? {})) {
const param = key as keyof BaseClientConfiguration;
config[param] = configOverrides?.[param] as never;
}

testsFailed += 1;
client = await GlideClient.createClient(options);
return { client, context: { client }, cluster };
client = await GlideClient.createClient(config);
return { client, cluster };
},
close: (context: Context, testSucceeded: boolean) => {
close: (testSucceeded: boolean) => {
if (testSucceeded) {
testsFailed -= 1;
}
Expand Down
25 changes: 12 additions & 13 deletions node/tests/GlideClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { gte } from "semver";
import { v4 as uuidv4 } from "uuid";
import {
BaseClientConfiguration,
BitwiseOperation,
ClusterTransaction,
Decoder,
Expand Down Expand Up @@ -50,9 +51,6 @@ import {
validateTransactionResponse,
waitForNotBusy,
} from "./TestUtilities";
type Context = {
client: GlideClusterClient;
};

const TIMEOUT = 50000;

Expand Down Expand Up @@ -81,25 +79,26 @@ describe("GlideClusterClient", () => {
}
});

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
runBaseTests({
init: async (protocol, configOverrides) => {
const config = getClientConfigurationOption(
cluster.getAddresses(),
protocol,
);
options.protocol = protocol;
options.clientName = clientName;

for (const key of Object.keys(configOverrides ?? {})) {
const param = key as keyof BaseClientConfiguration;
config[param] = configOverrides?.[param] as never;
}

testsFailed += 1;
client = await GlideClusterClient.createClient(options);
client = await GlideClusterClient.createClient(config);
return {
context: {
client,
},
client,
cluster,
};
},
close: (context: Context, testSucceeded: boolean) => {
close: (testSucceeded: boolean) => {
if (testSucceeded) {
testsFailed -= 1;
}
Expand Down
Loading
Loading