Skip to content

Commit

Permalink
[core-amqp] Update async methods to support cancellation (Azure#13835)
Browse files Browse the repository at this point in the history
* [core-amqp] make CbsClient.negotiateClaim() cancellable

* [core-amqp] make RequestResponseLink.create cancellable

* [core-amqp] make CbsClient.init cancellable

* [core-amqp] fix lint error

* [core-amqp] update API review

* [core-amqp] update 2.2.0 changelog with notes for issue 9988

* [core-amqp] update package.json version to 2.2.0

* [core-amqp] pass abortSignal to connection.open()

* [core-amqp] rename RequestResponseLink.create(..., options) to RequestResponseLink.create(..., createOptions)

* [core-amqp] rename RequestResponseLink.create(..., options) to RequestResponseLink.create(..., createOptions) - API review file
  • Loading branch information
chradek authored Feb 18, 2021
1 parent 829ac83 commit de864dd
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 17 deletions.
7 changes: 6 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Release History

## 2.1.1 (Unreleased)
## 2.2.0 (Unreleased)

- Addresses issue [9988](https://github.com/Azure/azure-sdk-for-js/issues/9988)
by updating the following operations to accept an `abortSignal` to allow cancellation:
- CbsClient.init()
- CbsClient.negotiateClaim()
- RequestResponseLink.create()

## 2.1.0 (2021-02-08)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "2.1.1",
"version": "2.2.0",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
12 changes: 9 additions & 3 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ export class CbsClient {
connection: Connection;
readonly connectionLock: string;
readonly endpoint: string;
init(): Promise<void>;
negotiateClaim(audience: string, token: string, tokenType: TokenType): Promise<CbsResponse>;
init(options?: {
abortSignal?: AbortSignalLike;
}): Promise<void>;
negotiateClaim(audience: string, token: string, tokenType: TokenType, options?: {
abortSignal?: AbortSignalLike;
}): Promise<CbsResponse>;
remove(): void;
readonly replyTo: string;
}
Expand Down Expand Up @@ -446,7 +450,9 @@ export class RequestResponseLink implements ReqResLink {
constructor(session: Session, sender: Sender, receiver: Receiver);
close(): Promise<void>;
get connection(): Connection;
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions): Promise<RequestResponseLink>;
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions, createOptions?: {
abortSignal?: AbortSignalLike;
}): Promise<RequestResponseLink>;
isOpen(): boolean;
// (undocumented)
receiver: Receiver;
Expand Down
33 changes: 28 additions & 5 deletions sdk/core/core-amqp/src/cbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
SenderOptions,
generate_uuid
} from "rhea-promise";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { Constants } from "./util/constants";
import { logErrorStackTrace, logger } from "./log";
import { translate } from "./errors";
Expand Down Expand Up @@ -71,15 +72,24 @@ export class CbsClient {
/**
* Creates a singleton instance of the CBS session if it hasn't been initialized previously on
* the given connection.
* @param options - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `init` invocation.
* @returns Promise<void>.
*/
async init(): Promise<void> {
async init(options: { abortSignal?: AbortSignalLike } = {}): Promise<void> {
const { abortSignal } = options;
const initAbortMessage = "The init operation has been cancelled by the user.";

try {
if (abortSignal?.aborted) {
throw new AbortError(initAbortMessage);
}

// Acquire the lock and establish an amqp connection if it does not exist.
if (!this.connection.isOpen()) {
logger.verbose("The CBS client is trying to establish an AMQP connection.");
await defaultLock.acquire(this.connectionLock, () => {
return this.connection.open();
return this.connection.open({ abortSignal });
});
}

Expand Down Expand Up @@ -107,7 +117,8 @@ export class CbsClient {
this._cbsSenderReceiverLink = await RequestResponseLink.create(
this.connection,
srOpt,
rxOpt
rxOpt,
{ abortSignal }
);
this._cbsSenderReceiverLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
Expand Down Expand Up @@ -179,15 +190,24 @@ export class CbsClient {
* - **ManagementClient**
* - `"sb://<your-namespace>.servicebus.windows.net/<event-hub-name>/$management"`.
* @param token - The token that needs to be sent in the put-token request.
* @param tokenType - The type of token being used. For example, 'jwt' or 'servicebus.windows.net:sastoken'.
* @param options - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `negotiateClaim` invocation.
* @returns A Promise that resolves when $cbs authentication is successful
* and rejects when an error occurs during $cbs authentication.
*/
async negotiateClaim(
audience: string,
token: string,
tokenType: TokenType
tokenType: TokenType,
options: { abortSignal?: AbortSignalLike } = {}
): Promise<CbsResponse> {
const { abortSignal } = options;
try {
if (abortSignal?.aborted) {
throw new AbortError("The negotiateClaim operation has been cancelled by the user.");
}

if (!this._cbsSenderReceiverLink) {
throw new Error("Attempted to negotiate a claim but the CBS link does not exist.");
}
Expand All @@ -203,7 +223,10 @@ export class CbsClient {
type: tokenType
}
};
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request);
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request, {
abortSignal,
requestName: "negotiateClaim"
});
logger.verbose("[%s] The CBS response is: %O", this.connection.id, responseMessage);
return this._fromRheaMessageResponse(responseMessage);
} catch (err) {
Expand Down
12 changes: 8 additions & 4 deletions sdk/core/core-amqp/src/requestResponseLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,20 @@ export class RequestResponseLink implements ReqResLink {
* @param connection - The amqp connection.
* @param senderOptions - Options that must be provided to create the sender link.
* @param receiverOptions - Options that must be provided to create the receiver link.
* @param createOptions - Optional parameters that can be used to affect this method's behavior.
* For example, `abortSignal` can be passed to allow cancelling an in-progress `create` invocation.
* @returns Promise<RequestResponseLink>
*/
static async create(
connection: Connection,
senderOptions: SenderOptions,
receiverOptions: ReceiverOptions
receiverOptions: ReceiverOptions,
createOptions: { abortSignal?: AbortSignalLike } = {}
): Promise<RequestResponseLink> {
const session = await connection.createSession();
const sender = await session.createSender(senderOptions);
const receiver = await session.createReceiver(receiverOptions);
const { abortSignal } = createOptions;
const session = await connection.createSession({ abortSignal });
const sender = await session.createSender({ ...senderOptions, abortSignal });
const receiver = await session.createReceiver({ ...receiverOptions, abortSignal });
logger.verbose(
"[%s] Successfully created the sender and receiver links on the same session.",
connection.id
Expand Down
116 changes: 113 additions & 3 deletions sdk/core/core-amqp/test/cbs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,83 @@
// Licensed under the MIT license.

import { assert } from "chai";
import { AbortController } from "@azure/abort-controller";
import { CbsClient, defaultLock, TokenType } from "../src";
import { createConnectionStub } from "./utils/createConnectionStub";
import { Connection } from "rhea-promise";
import { stub } from "sinon";
import { CbsClient, TokenType } from "../src";

describe("CbsClient", function() {
const TEST_FAILURE = "Test failure";

describe("init", function() {
it("honors already aborted abortSignal", async function() {
const cbsClient = new CbsClient(new Connection(), "lock");

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal inside locking code", async function() {
const lock = "lock";
const cbsClient = new CbsClient(new Connection(), "lock");

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;

// Make the existing `init` invocation wait until the abortSignal
// is aborted before acquiring it's lock.
await defaultLock.acquire(lock, (done) => {
setTimeout(() => {
controller.abort();
done();
}, 0);
});

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connectionStub = new Connection();
// Stub 'open' because creating a real connection will fail.
stub(connectionStub, "open").resolves({} as any);

const cbsClient = new CbsClient(connectionStub, "lock");

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await cbsClient.init({ abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});

describe("negotiateClaim", function() {
it("throws an error if the cbs link doesn't exist.", async function() {
const connectionStub = stub(new Connection()) as any;

const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

try {
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas);
throw new Error(TEST_FAILURE);
Expand All @@ -23,5 +89,49 @@ describe("CbsClient", function() {
);
}
});

describe("cancellation", function() {
it("honors already aborted abortSignal", async function() {
const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
// Pass the already aborted abortSignal to make sure negotiateClaim will exit quickly.
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
abortSignal: signal
});
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connectionStub = createConnectionStub();
const cbsClient = new CbsClient(connectionStub, "lock");

// Call `init()` to ensure the CbsClient has a RequestResponseLink.
await cbsClient.init();

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
abortSignal: signal
});
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});
});
});
43 changes: 43 additions & 0 deletions sdk/core/core-amqp/test/requestResponse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
getCodeDescriptionAndError,
onMessageReceived
} from "../src/requestResponseLink";
import { createConnectionStub } from "./utils/createConnectionStub";
interface Window {}
declare let self: Window & typeof globalThis;

Expand All @@ -42,6 +43,48 @@ const assertItemsLengthInResponsesMap = (
};

describe("RequestResponseLink", function() {
const TEST_FAILURE = "Test failure";

describe("#create", function() {
it("should create a RequestResponseLink", async function() {
const connectionStub = createConnectionStub();
const link = await RequestResponseLink.create(connectionStub, {}, {});
assert.isTrue(link instanceof RequestResponseLink);
});

it("honors already aborted abortSignal", async function() {
const connection = new Connection();

// Create an abort signal that will be aborted on a future tick of the event loop.
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 0);

try {
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});

it("honors abortSignal", async function() {
const connection = new Connection();

// Create an abort signal that is already aborted.
const controller = new AbortController();
controller.abort();
const signal = controller.signal;

try {
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
throw new Error(TEST_FAILURE);
} catch (err) {
assert.equal(err.name, "AbortError");
}
});
});

it("should send a request and receive a response correctly", async function() {
const connectionStub = stub(new Connection());
const rcvr = new EventEmitter();
Expand Down
31 changes: 31 additions & 0 deletions sdk/core/core-amqp/test/utils/createConnectionStub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import EventEmitter from "events";
import { Connection } from "rhea-promise";
import { stub } from "sinon";

/**
* Creates a stubbed rhea-promise Connection object.
*/
export function createConnectionStub(): Connection {
const connectionStub = new Connection();
stub(connectionStub, "open").resolves({} as any);
stub(connectionStub, "createSession").resolves({
connection: {
id: "connection-1"
},
createSender: () => {
const sender = new EventEmitter() as any;
sender.send = () => {
/* no-op */
};
return Promise.resolve(sender);
},
createReceiver: () => {
return Promise.resolve(new EventEmitter());
}
} as any);
stub(connectionStub, "id").get(() => "connection-1");
return connectionStub;
}

0 comments on commit de864dd

Please sign in to comment.