Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-amqp] Update async methods to support cancellation #13835

Merged
merged 10 commits into from
Feb 18, 2021
7 changes: 6 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Release History

## 2.1.1 (Unreleased)
## 2.2.0 (Unreleased)

- Addresses issue [9988](https://github.com/Azure/azure-sdk-for-js/issues/9988)
by updating the following operations to accept an `abortSignal` to allow cancellation:
- CbsClient.init()
- CbsClient.negotiateClaim()
- RequestResponseLink.create()

## 2.1.0 (2021-02-08)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "2.1.1",
"version": "2.2.0",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
12 changes: 9 additions & 3 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ export class CbsClient {
connection: Connection;
readonly connectionLock: string;
readonly endpoint: string;
init(): Promise<void>;
negotiateClaim(audience: string, token: string, tokenType: TokenType): Promise<CbsResponse>;
init(options?: {
abortSignal?: AbortSignalLike;
}): Promise<void>;
negotiateClaim(audience: string, token: string, tokenType: TokenType, options?: {
abortSignal?: AbortSignalLike;
}): Promise<CbsResponse>;
remove(): void;
readonly replyTo: string;
}
Expand Down Expand Up @@ -446,7 +450,9 @@ export class RequestResponseLink implements ReqResLink {
constructor(session: Session, sender: Sender, receiver: Receiver);
close(): Promise<void>;
get connection(): Connection;
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions): Promise<RequestResponseLink>;
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions, createOptions?: {
abortSignal?: AbortSignalLike;
}): Promise<RequestResponseLink>;
isOpen(): boolean;
// (undocumented)
receiver: Receiver;
Expand Down
33 changes: 28 additions & 5 deletions sdk/core/core-amqp/src/cbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
SenderOptions,
generate_uuid
} from "rhea-promise";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { Constants } from "./util/constants";
import { logErrorStackTrace, logger } from "./log";
import { translate } from "./errors";
Expand Down Expand Up @@ -71,15 +72,24 @@ export class CbsClient {
/**
* Creates a singleton instance of the CBS session if it hasn't been initialized previously on
* the given connection.
* @param options - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `init` invocation.
* @returns Promise<void>.
*/
async init(): Promise<void> {
async init(options: { abortSignal?: AbortSignalLike } = {}): Promise<void> {
const { abortSignal } = options;
const initAbortMessage = "The init operation has been cancelled by the user.";
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved

try {
if (abortSignal?.aborted) {
throw new AbortError(initAbortMessage);
}

// Acquire the lock and establish an amqp connection if it does not exist.
if (!this.connection.isOpen()) {
logger.verbose("The CBS client is trying to establish an AMQP connection.");
await defaultLock.acquire(this.connectionLock, () => {
return this.connection.open();
return this.connection.open({ abortSignal });
});
}

Expand Down Expand Up @@ -107,7 +117,8 @@ export class CbsClient {
this._cbsSenderReceiverLink = await RequestResponseLink.create(
this.connection,
srOpt,
rxOpt
rxOpt,
{ abortSignal }
);
this._cbsSenderReceiverLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
Expand Down Expand Up @@ -179,15 +190,24 @@ export class CbsClient {
* - **ManagementClient**
* - `"sb://<your-namespace>.servicebus.windows.net/<event-hub-name>/$management"`.
* @param token - The token that needs to be sent in the put-token request.
* @param tokenType - The type of token being used. For example, 'jwt' or 'servicebus.windows.net:sastoken'.
* @param options - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `negotiateClaim` invocation.
* @returns A Promise that resolves when $cbs authentication is successful
* and rejects when an error occurs during $cbs authentication.
*/
async negotiateClaim(
audience: string,
token: string,
tokenType: TokenType
tokenType: TokenType,
options: { abortSignal?: AbortSignalLike } = {}
): Promise<CbsResponse> {
const { abortSignal } = options;
try {
if (abortSignal?.aborted) {
throw new AbortError("The negotiateClaim operation has been cancelled by the user.");
}

if (!this._cbsSenderReceiverLink) {
throw new Error("Attempted to negotiate a claim but the CBS link does not exist.");
}
Expand All @@ -203,7 +223,10 @@ export class CbsClient {
type: tokenType
}
};
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request);
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request, {
abortSignal,
requestName: "negotiateClaim"
});
logger.verbose("[%s] The CBS response is: %O", this.connection.id, responseMessage);
return this._fromRheaMessageResponse(responseMessage);
} catch (err) {
Expand Down
12 changes: 8 additions & 4 deletions sdk/core/core-amqp/src/requestResponseLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,20 @@ export class RequestResponseLink implements ReqResLink {
* @param connection - The amqp connection.
* @param senderOptions - Options that must be provided to create the sender link.
* @param receiverOptions - Options that must be provided to create the receiver link.
* @param createOptions - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `create` invocation.
* @returns Promise<RequestResponseLink>
*/
static async create(
connection: Connection,
senderOptions: SenderOptions,
receiverOptions: ReceiverOptions
receiverOptions: ReceiverOptions,
createOptions: { abortSignal?: AbortSignalLike } = {}
): Promise<RequestResponseLink> {
const session = await connection.createSession();
const sender = await session.createSender(senderOptions);
const receiver = await session.createReceiver(receiverOptions);
const { abortSignal } = createOptions;
const session = await connection.createSession({ abortSignal });
const sender = await session.createSender({ ...senderOptions, abortSignal });
const receiver = await session.createReceiver({ ...receiverOptions, abortSignal });
logger.verbose(
"[%s] Successfully created the sender and receiver links on the same session.",
connection.id
Expand Down
116 changes: 113 additions & 3 deletions sdk/core/core-amqp/test/cbs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,83 @@
// Licensed under the MIT license.

import { assert } from "chai";
import { AbortController } from "@azure/abort-controller";
import { CbsClient, defaultLock, TokenType } from "../src";
import { createConnectionStub } from "./utils/createConnectionStub";
import { Connection } from "rhea-promise";
import { stub } from "sinon";
import { CbsClient, TokenType } from "../src";

describe("CbsClient", function() {
const TEST_FAILURE = "Test failure";

describe("init", function() {
it("honors already aborted abortSignal", async function() {
const cbsClient = new CbsClient(new Connection(), "lock");

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal inside locking code", async function() {
const lock = "lock";
const cbsClient = new CbsClient(new Connection(), "lock");

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;

// Make the existing `init` invocation wait until the abortSignal
// is aborted before acquiring it's lock.
await defaultLock.acquire(lock, (done) => {
setTimeout(() => {
controller.abort();
done();
}, 0);
});

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connectionStub = new Connection();
// Stub 'open' because creating a real connection will fail.
stub(connectionStub, "open").resolves({} as any);

const cbsClient = new CbsClient(connectionStub, "lock");

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});

describe("negotiateClaim", function() {
it("throws an error if the cbs link doesn't exist.", async function() {
const connectionStub = stub(new Connection()) as any;

const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

try {
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas);
throw new Error(TEST_FAILURE);
Expand All @@ -23,5 +89,49 @@ describe("CbsClient", function() {
);
}
});

describe("cancellation", function() {
it("honors already aborted abortSignal", async function() {
const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
// Pass the already aborted abortSignal to make sure negotiateClaim will exit quickly.
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
abortSignal: signal
});
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

// Call `init()` to ensure the CbsClient has a RequestResponseLink.
await cbsClient.init();

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
abortSignal: signal
});
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});
});
});
43 changes: 43 additions & 0 deletions sdk/core/core-amqp/test/requestResponse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
getCodeDescriptionAndError,
onMessageReceived
} from "../src/requestResponseLink";
import { createConnectionStub } from "./utils/createConnectionStub";
interface Window {}
declare let self: Window & typeof globalThis;

Expand All @@ -42,6 +43,48 @@ const assertItemsLengthInResponsesMap = (
};

describe("RequestResponseLink", function() {
const TEST_FAILURE = "Test failure";

describe("#create", function() {
it("should create a RequestResponseLink", async function() {
const connectionStub = createConnectionStub();
const link = await RequestResponseLink.create(connectionStub, {}, {});
assert.isTrue(link instanceof RequestResponseLink);
});

it("honors already aborted abortSignal", async function() {
const connection = new Connection();

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connection = new Connection();

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});

it("should send a request and receive a response correctly", async function() {
const connectionStub = stub(new Connection());
const rcvr = new EventEmitter();
Expand Down
31 changes: 31 additions & 0 deletions sdk/core/core-amqp/test/utils/createConnectionStub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import EventEmitter from "events";
import { Connection } from "rhea-promise";
import { stub } from "sinon";

/**
* Creates a stubbed rhea-promise Connection object.
*/
export function createConnectionStub(): Connection {
const connectionStub = new Connection();
stub(connectionStub, "open").resolves({} as any);
stub(connectionStub, "createSession").resolves({
connection: {
id: "connection-1"
},
createSender: () => {
const sender = new EventEmitter() as any;
sender.send = () => {
/* no-op */
};
return Promise.resolve(sender);
},
createReceiver: () => {
return Promise.resolve(new EventEmitter());
}
} as any);
stub(connectionStub, "id").get(() => "connection-1");
return connectionStub;
}