diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 9a2d13787a1d..7d819318fd62 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -21,6 +21,15 @@ import { TokenCredential } from '@azure/core-auth'; import { WebSocketImpl } from 'rhea-promise'; import { WebSocketOptions } from '@azure/core-amqp'; +// @public +export interface BufferedCloseOptions extends OperationOptions { + flush?: boolean; +} + +// @public +export interface BufferedFlushOptions extends OperationOptions { +} + // @public export interface Checkpoint { consumerGroup: string; @@ -55,6 +64,10 @@ export interface CreateBatchOptions extends OperationOptions { // @public export const earliestEventPosition: EventPosition; +// @public +export interface EnqueueEventOptions extends SendBatchOptions { +} + // @public export interface EventData { body: any; @@ -82,6 +95,30 @@ export interface EventDataBatch { tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean; } +// @public +export class EventHubBufferedProducerClient { + constructor(connectionString: string, options: EventHubBufferedProducerClientOptions); + constructor(connectionString: string, eventHubName: string, options: EventHubBufferedProducerClientOptions); + constructor(fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential | NamedKeyCredential | SASCredential, options: EventHubBufferedProducerClientOptions); + close(options?: BufferedCloseOptions): Promise; + enqueueEvent(event: EventData | AmqpAnnotatedMessage, options?: EnqueueEventOptions): Promise; + enqueueEvents(events: EventData[] | AmqpAnnotatedMessage[], options?: EnqueueEventOptions): Promise; + get eventHubName(): string; + flush(options?: BufferedFlushOptions): Promise; + get fullyQualifiedNamespace(): string; + getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise; + getPartitionIds(options?: GetPartitionIdsOptions): Promise>; + getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise; +} + +// @public +export interface EventHubBufferedProducerClientOptions extends EventHubClientOptions { + maxEventBufferLengthPerPartition?: number; + maxWaitTimeInMs?: number; + onSendEventsErrorHandler: (ctx: OnSendEventsErrorContext) => Promise; + onSendEventsSuccessHandler?: (ctx: OnSendEventsSuccessContext) => Promise; +} + // @public export interface EventHubClientOptions { customEndpointAddress?: string; @@ -190,6 +227,19 @@ export const logger: AzureLogger; export { MessagingError } +// @public +export interface OnSendEventsErrorContext { + error: Error; + events: Array; + partitionId: string; +} + +// @public +export interface OnSendEventsSuccessContext { + events: Array; + partitionId: string; +} + // @public export interface OperationOptions { abortSignal?: AbortSignalLike; diff --git a/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts b/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts new file mode 100644 index 000000000000..60b002d1a8ac --- /dev/null +++ b/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts @@ -0,0 +1,328 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { AmqpAnnotatedMessage, delay } from "@azure/core-amqp"; +import { + EventData, + EventDataBatch, + EventHubBufferedProducerClientOptions, + EventHubProducerClient, + OperationOptions +} from "./index"; +import { AwaitableQueue } from "./impl/awaitableQueue"; +import { isDefined, isObjectWithProperties } from "./util/typeGuards"; +import { AbortSignalLike } from "@azure/abort-controller"; +import { getPromiseParts } from "./util/getPromiseParts"; +import { logger } from "./log"; + +export interface BatchingPartitionChannelProps { + loopAbortSignal: AbortSignalLike; + maxBufferSize: number; + maxWaitTimeInMs: number; + partitionId: string; + producer: EventHubProducerClient; + /** + * The handler to call once a batch has successfully published. + */ + onSendEventsSuccessHandler?: EventHubBufferedProducerClientOptions["onSendEventsSuccessHandler"]; + /** + * The handler to call when a batch fails to publish. + */ + onSendEventsErrorHandler: EventHubBufferedProducerClientOptions["onSendEventsErrorHandler"]; +} + +/** + * The `BatchingPartitionChannel` is responsible for accepting enqueued events + * and optimally batching and sending them to an Event Hub. + * @internal + */ +export class BatchingPartitionChannel { + private _eventQueue = new AwaitableQueue(); + private _batchedEvents: Array = []; + private _bufferCount: number = 0; + private _readyQueue: Array<{ + resolve: (value: void) => void; + reject: (reason?: any) => void; + }> = []; + private _flushState: + | { isFlushing: false } + | { isFlushing: true; currentPromise: Promise; resolve: () => void } = { + isFlushing: false + }; + private _isRunning: boolean = false; + private _lastBatchCreationTime: number = 0; + private _loopAbortSignal: AbortSignalLike; + private _maxBufferSize: number; + private _maxWaitTimeInMs: number; + private _onSendEventsErrorHandler: EventHubBufferedProducerClientOptions["onSendEventsErrorHandler"]; + private _onSendEventsSuccessHandler?: EventHubBufferedProducerClientOptions["onSendEventsSuccessHandler"]; + + private _partitionId: string; + private _producer: EventHubProducerClient; + + constructor({ + loopAbortSignal, + maxBufferSize, + maxWaitTimeInMs, + onSendEventsErrorHandler, + onSendEventsSuccessHandler, + partitionId, + producer + }: BatchingPartitionChannelProps) { + this._loopAbortSignal = loopAbortSignal; + this._maxBufferSize = maxBufferSize; + this._maxWaitTimeInMs = maxWaitTimeInMs; + this._onSendEventsErrorHandler = onSendEventsErrorHandler; + this._onSendEventsSuccessHandler = onSendEventsSuccessHandler; + this._partitionId = partitionId; + this._producer = producer; + } + + getCurrentBufferedCount(): number { + return this._bufferCount; + } + + async enqueueEvent(event: EventData | AmqpAnnotatedMessage): Promise { + await this._ready(); + this._eventQueue.push(event); + this._bufferCount++; + + if (!this._isRunning) { + this._isRunning = true; + this._startPublishLoop().catch((e) => { + logger.error( + `The following error occured during batch creation or sending: ${JSON.stringify( + e, + undefined, + " " + )}` + ); + }); + } + } + + /** + * Sets the flush state so that no new events can be enqueued until + * all the currently buffered events are sent to the Event Hub. + * + * Returns a promise that resolves once flushing is complete. + */ + async flush(_options: OperationOptions = {}): Promise { + const state = this._flushState; + if (state.isFlushing) { + return state.currentPromise; + } + + if (this.getCurrentBufferedCount() === 0) { + return Promise.resolve(); + } + + const { promise, resolve } = getPromiseParts(); + this._flushState = { isFlushing: true, currentPromise: promise, resolve }; + + return promise; + } + + /** + * Returns a promise that resolves once there is room for events to be added + * to the buffer. + */ + private _ready(): Promise { + const currentBufferedCount = this.getCurrentBufferedCount(); + + // If the buffer isn't full and we don't have any pending `ready()` calls, + // then it's safe to return right away. + if ( + currentBufferedCount < this._maxBufferSize && + !this._readyQueue.length && + !this._flushState.isFlushing + ) { + return Promise.resolve(); + } + + const { promise: readyPromise, reject, resolve } = getPromiseParts(); + this._readyQueue.push({ resolve, reject }); + + return readyPromise; + } + + /** + * Starts the loop that creates batches and sends them to the Event Hub. + * + * The loop will run until the `_loopAbortSignal` is aborted. + */ + private async _startPublishLoop() { + let batch: EventDataBatch | undefined; + let futureEvent = this._eventQueue.shift(); + // `eventToAddToBatch` is used to keep track of an event that has been removed + // from the queue, but has not yet been added to a batch. + // This prevents losing an event if a `sendBatch` or `createBatch` call fails + // before the event is added to a batch. + let eventToAddToBatch: EventData | AmqpAnnotatedMessage | undefined; + while (!this._loopAbortSignal.aborted) { + try { + if (!isDefined(batch)) { + batch = await this._createBatch(); + } + const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime; + const maximumTimeToWaitForEvent = batch.count + ? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0) + : this._maxWaitTimeInMs; + + const event = + eventToAddToBatch ?? + (await Promise.race([futureEvent, delay(maximumTimeToWaitForEvent)])); + + if (!event) { + // We didn't receive an event within the allotted time. + // Send the existing batch if it has events in it. + if (batch.count) { + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } + continue; + } else if (!eventToAddToBatch) { + eventToAddToBatch = event; + // We received an event, so get a promise for the next one. + futureEvent = this._eventQueue.shift(); + } + + const didAdd = batch.tryAdd(event); + if (didAdd) { + // This event will definitely make it to one of the user-provided handlers + // since it was added to a batch. + // Store it so we can return it in a handler later. + this._batchedEvents.push(event); + // Clear reference to existing event since it has been added to the batch. + eventToAddToBatch = undefined; + } + + if (didAdd && batch.count >= this._maxBufferSize) { + // Whenever batch.count exceeds the max count of buffered events, send the batch. + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } else if (!didAdd && batch.count) { + // If the event wasn't able to be added and the current batch isn't empty, + // attempt to send the current batch and add the event to a new batch. + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } + + if (!didAdd && !batch.tryAdd(event)) { + // TODO: Report MaxMesageSizeExceeded error. Mimic service's error. + this._reportFailure(new Error("Placeholder for max message size exceeded"), event); + } else if (!didAdd) { + // Handles the case where the event _was_ successfull added to the new batch. + this._batchedEvents.push(event); + } + // Clear reference to existing event since it has been added to the batch. + eventToAddToBatch = undefined; + } catch (err) { + if (!isObjectWithProperties(err, ["name"]) || err.name !== "AbortError") { + this._reportFailure(err); + batch = undefined; + this._batchedEvents = []; + } + } + } + } + + /** + * Helper method that returns an `EventDataBatch`. + * This also has the side effects of + * - keeping track of batch creation time: needed for maxWaitTime calculations. + * - clearing reference to batched events. + * - incrementing the readiness: creating a new batch indicates the buffer + * should have room, so we can resolve some pending `ready()` calls. + */ + private async _createBatch(): Promise { + this._lastBatchCreationTime = Date.now(); + this._batchedEvents = []; + const batch = await this._producer.createBatch({ + partitionId: this._partitionId + }); + this._incrementReadiness(); + return batch; + } + + /** + * This method will resolve as many pending `ready()` calls as it can + * based on how much space remains in the buffer. + * + * If the channel is currently flushing, this is a no-op. This prevents + * `enqueueEvent` calls from adding the event to the buffer until flushing + * completes. + */ + private _incrementReadiness() { + if (this._flushState.isFlushing) { + return; + } + const currentBufferedCount = this.getCurrentBufferedCount(); + const num = Math.min(this._maxBufferSize - currentBufferedCount, this._readyQueue.length); + for (let i = 0; i < num; i++) { + this._readyQueue.shift()?.resolve(); + } + } + + /** + * Calls the user-provided `onSendEventsSuccessHandler` with the events + * that were successfully sent. + */ + private _reportSuccess() { + this._bufferCount = this._bufferCount - this._batchedEvents.length; + this._updateFlushState(); + this._onSendEventsSuccessHandler?.({ + events: this._batchedEvents, + partitionId: this._partitionId + }).catch((e) => { + logger.error( + `The following error occured in the onSendEventsSuccessHandler: ${JSON.stringify( + e, + undefined, + " " + )}` + ); + }); + } + + /** + * Calls the user-provided `onSendEventsErrorHandler` with an error and the events + * that were not successfully sent. + */ + private _reportFailure(err: any, event?: EventData | AmqpAnnotatedMessage) { + this._bufferCount = this._bufferCount - (event ? 1 : this._batchedEvents.length); + this._updateFlushState(); + this._onSendEventsErrorHandler({ + error: err, + events: event ? [event] : this._batchedEvents, + partitionId: this._partitionId + }).catch((e) => { + logger.error( + `The following error occured in the onSendEventsErrorHandler: ${JSON.stringify( + e, + undefined, + " " + )}` + ); + }); + } + + /** + * Updates the channel's flush state once the size of the + * event buffer has decreased to 0. + */ + private _updateFlushState() { + const state = this._flushState; + if (!state.isFlushing || this.getCurrentBufferedCount() !== 0) { + return; + } + + state.resolve(); + + this._flushState = { isFlushing: false }; + this._incrementReadiness(); + } +} diff --git a/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts new file mode 100644 index 000000000000..44b38e3ce06b --- /dev/null +++ b/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts @@ -0,0 +1,456 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { AbortController } from "@azure/abort-controller"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; +import { NamedKeyCredential, SASCredential, TokenCredential } from "@azure/core-auth"; +import { BatchingPartitionChannel } from "./batchingPartitionChannel"; +import { PartitionAssigner } from "./impl/partitionAssigner"; +import { EventData, EventHubProducerClient, OperationOptions } from "./index"; +import { EventHubProperties, PartitionProperties } from "./managementClient"; +import { + EventHubClientOptions, + GetEventHubPropertiesOptions, + GetPartitionIdsOptions, + GetPartitionPropertiesOptions, + SendBatchOptions +} from "./models/public"; +import { isCredential, isDefined } from "./util/typeGuards"; + +/** + * Contains the events that were successfully sent to the Event Hub, + * and the partition they were assigned to. + */ +export interface OnSendEventsSuccessContext { + /** + * The partition each event was assigned. + */ + partitionId: string; + /** + * The array of {@link EventData} and/or `AmqpAnnotatedMessage` that were successfully sent to the Event Hub. + */ + events: Array; +} + +/** + * Contains the events that were not successfully sent to the Event Hub, + * the partition they were assigned to, and the error that was encountered while sending. + */ +export interface OnSendEventsErrorContext { + /** + * The partition each event was assigned. + */ + partitionId: string; + /** + * The array of {@link EventData} and/or `AmqpAnnotatedMessage` that were not successfully sent to the Event Hub. + */ + events: Array; + /** + * The error that occurred when sending the associated events to the Event Hub. + */ + error: Error; +} + +/** + * Describes the options that can be provided while creating the `EventHubBufferedProducerClient`. + */ +export interface EventHubBufferedProducerClientOptions extends EventHubClientOptions { + /** + * The total number of events that can be buffered for publishing at a given time for a given partition. + * + * Default: 1500 + */ + maxEventBufferLengthPerPartition?: number; + /** + * The amount of time to wait for a new event to be enqueued in the buffer before publishing a partially full batch. + * + * Default: 1 second. + */ + maxWaitTimeInMs?: number; + /** + * The handler to call once a batch has successfully published. + */ + onSendEventsSuccessHandler?: (ctx: OnSendEventsSuccessContext) => Promise; + /** + * The handler to call when a batch fails to publish. + */ + onSendEventsErrorHandler: (ctx: OnSendEventsErrorContext) => Promise; +} + +/** + * Options to configure the `flush` method on the `EventHubBufferedProducerClient`. + */ +export interface BufferedFlushOptions extends OperationOptions {} + +/** + * Options to configure the `close` method on the `EventHubBufferedProducerClient`. + */ +export interface BufferedCloseOptions extends OperationOptions { + /** + * When `true`, all buffered events that are pending should be sent before closing. + * When `false`, abandon all buffered events and close immediately. + * Defaults to `true`. + */ + flush?: boolean; +} + +/** + * Options to configure the `enqueueEvents` method on the `EventHubBufferedProcuerClient`. + */ +export interface EnqueueEventOptions extends SendBatchOptions {} + +/** + * The `EventHubBufferedProducerClient`is used to publish events to a specific Event Hub. + * + * The `EventHubBufferedProducerClient` does not publish events immediately. + * Instead, events are buffered so they can be efficiently batched and published + * when the batch is full or the `maxWaitTimeInMs` has elapsed with no new events + * enqueued. + * + * Depending on the options specified when events are enqueued, they may be + * automatically assigned to a partition, grouped according to the specified partition key, + * or assigned a specifically requested partition. + * + * This model is intended to shift the burden of batch management from callers, at the cost of + * non-deterministic timing, for when events will be published. There are additional trade-offs + * to consider, as well: + * - If the application crashes, events in the buffer will not have been published. To prevent + * data loss, callers are encouraged to track publishing progress using the + * `onSendEventsSuccessHandler` and `onSendEventsErrorHandler` handlers. + * - Events specifying a partition key may be assigned a different partition than those using + * the same key with other producers. + * - In the unlikely event that a partition becomes temporarily unavailable, the + * `EventHubBufferedProducerClient` may take longer to recover than other producers. + * + * In scenarios where it is important to have events published immediately with a deterministic + * outcome, ensure that partition keys are assigned to a partition consistent with other + * publishers, or where maximizing availability is a requirement, using the + * `EventHubProducerClient` is recommended. + */ +export class EventHubBufferedProducerClient { + /** + * Controls the `abortSignal` passed to each `BatchingPartitionChannel`. + * Used to signal when a channel should stop waiting for events. + */ + private _abortController = new AbortController(); + + /** + * Indicates whether the client has been explicitly closed. + */ + private _isClosed: boolean = false; + + /** + * Handles assigning partitions. + */ + private _partitionAssigner = new PartitionAssigner(); + + /** + * The known partitionIds that will be used when assigning events to partitions. + */ + private _partitionIds: string[] = []; + + /** + * The EventHubProducerClient to use when creating and sending batches to the Event Hub. + */ + private _producer: EventHubProducerClient; + + /** + * Mapping of partitionIds to `BatchingPartitionChannels`. + * Each `BatchingPartitionChannel` handles buffering events and backpressure independently. + */ + private _partitionChannels = new Map(); + + /** + * The options passed by the user when creating the EventHubBufferedProducerClient instance. + */ + private _clientOptions: EventHubBufferedProducerClientOptions; + + /** + * @readonly + * The name of the Event Hub instance for which this client is created. + */ + get eventHubName(): string { + return this._producer.eventHubName; + } + + /** + * @readonly + * The fully qualified namespace of the Event Hub instance for which this client is created. + * This is likely to be similar to .servicebus.windows.net. + */ + get fullyQualifiedNamespace(): string { + return this._producer.fullyQualifiedNamespace; + } + + /** + * The `EventHubBufferedProducerClient` class is used to send events to an Event Hub. + * Use the `options` parmeter to configure retry policy or proxy settings. + * @param connectionString - The connection string to use for connecting to the Event Hub instance. + * It is expected that the shared key properties and the Event Hub path are contained in this connection string. + * e.g. 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name'. + * @param options - A set of options to apply when configuring the client. + * - `retryOptions` : Configures the retry policy for all the operations on the client. + * For example, `{ "maxRetries": 4 }` or `{ "maxRetries": 4, "retryDelayInMs": 30000 }`. + * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. + * - `userAgent` : A string to append to the built in user agent string that is passed to the service. + */ + constructor(connectionString: string, options: EventHubBufferedProducerClientOptions); + /** + * The `EventHubBufferedProducerClient` class is used to send events to an Event Hub. + * Use the `options` parmeter to configure retry policy or proxy settings. + * @param connectionString - The connection string to use for connecting to the Event Hubs namespace. + * It is expected that the shared key properties are contained in this connection string, but not the Event Hub path, + * e.g. 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;'. + * @param eventHubName - The name of the specific Event Hub to connect the client to. + * @param options - A set of options to apply when configuring the client. + * - `retryOptions` : Configures the retry policy for all the operations on the client. + * For example, `{ "maxRetries": 4 }` or `{ "maxRetries": 4, "retryDelayInMs": 30000 }`. + * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. + * - `userAgent` : A string to append to the built in user agent string that is passed to the service. + */ + constructor( + connectionString: string, + eventHubName: string, + options: EventHubBufferedProducerClientOptions + ); + /** + * The `EventHubBufferedProducerClient` class is used to send events to an Event Hub. + * Use the `options` parmeter to configure retry policy or proxy settings. + * @param fullyQualifiedNamespace - The full namespace which is likely to be similar to + * .servicebus.windows.net + * @param eventHubName - The name of the specific Event Hub to connect the client to. + * @param credential - An credential object used by the client to get the token to authenticate the connection + * with the Azure Event Hubs service. + * See @azure/identity for creating credentials that support AAD auth. + * Use the `AzureNamedKeyCredential` from @azure/core-auth if you want to pass in a `SharedAccessKeyName` + * and `SharedAccessKey` without using a connection string. These fields map to the `name` and `key` field respectively + * in `AzureNamedKeyCredential`. + * Use the `AzureSASCredential` from @azure/core-auth if you want to pass in a `SharedAccessSignature` + * without using a connection string. This field maps to `signature` in `AzureSASCredential`. + * @param options - A set of options to apply when configuring the client. + * - `retryOptions` : Configures the retry policy for all the operations on the client. + * For example, `{ "maxRetries": 4 }` or `{ "maxRetries": 4, "retryDelayInMs": 30000 }`. + * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. + * - `userAgent` : A string to append to the built in user agent string that is passed to the service. + */ + constructor( + fullyQualifiedNamespace: string, + eventHubName: string, + credential: TokenCredential | NamedKeyCredential | SASCredential, + options: EventHubBufferedProducerClientOptions + ); + constructor( + fullyQualifiedNamespaceOrConnectionString1: string, + eventHubNameOrOptions2: string | EventHubBufferedProducerClientOptions, + credentialOrOptions3?: + | TokenCredential + | NamedKeyCredential + | SASCredential + | EventHubBufferedProducerClientOptions, + options4?: EventHubBufferedProducerClientOptions + ) { + if (typeof eventHubNameOrOptions2 !== "string") { + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2 + ); + this._clientOptions = { ...eventHubNameOrOptions2 }; + } else if (!isCredential(credentialOrOptions3)) { + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2, + credentialOrOptions3 + ); + this._clientOptions = { ...credentialOrOptions3! }; + } else { + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2, + credentialOrOptions3, + options4 + ); + this._clientOptions = { ...options4! }; + } + } + + /** + * Closes the AMQP connection to the Event Hub instance, + * returning a promise that will be resolved when disconnection is completed. + * + * This will wait for enqueued events to be flushed to the service before closing + * the connection. + * To close without flushing, set the `flush` option to `false`. + * + * @param options - The set of options to apply to the operation call. + * @returns Promise + * @throws Error if the underlying connection encounters an error while closing. + */ + async close(options: BufferedCloseOptions = {}): Promise { + if (!isDefined(options.flush) || options.flush === true) { + await this.flush(options); + } + // Calling abort signals to the BatchingPartitionChannels that they + // should stop reading/sending events. + this._abortController.abort(); + return this._producer.close(); + } + + /** + * Enqueues an event into the buffer to be published to the Event Hub. + * If there is no capacity in the buffer when this method is invoked, + * it will wait for space to become available and ensure that the event + * has been enqueued. + * + * When this call returns, the event has been accepted into the buffer, + * but it may not have been published yet. + * Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param events - An {@link EventData} or `AmqpAnnotatedMessage`. + * @param options - A set of options that can be specified to influence the way in which + * the event is sent to the associated Event Hub. + * - `abortSignal` : A signal used to cancel the enqueueEvent operation. + * - `partitionId` : The partition this set of events will be sent to. If set, `partitionKey` can not be set. + * - `partitionKey` : A value that is hashed to produce a partition assignment. If set, `partitionId` can not be set. + * @returns The total number of events that are currently buffered and waiting to be published, across all partitions. + */ + async enqueueEvent( + event: EventData | AmqpAnnotatedMessage, + options: EnqueueEventOptions = {} + ): Promise { + if (this._isClosed) { + throw new Error( + `This EventHubBufferedProducerClient has already been closed. Create a new client to enqueue events.` + ); + } + + // TODO: Start a loop that queries partition Ids. + // partition ids can be added to an Event Hub after it's been created. + if (!this._partitionIds.length) { + this._partitionIds = await this.getPartitionIds(); + this._partitionAssigner.setPartitionIds(this._partitionIds); + } + + const partitionId = this._partitionAssigner.assignPartition({ + partitionId: options.partitionId, + partitionKey: options.partitionKey + }); + + const partitionChannel = this._getPartitionChannel(partitionId); + await partitionChannel.enqueueEvent(event); + return this._getTotalBufferedEventsCount(); + } + + /** + * Enqueues events into the buffer to be published to the Event Hub. + * If there is no capacity in the buffer when this method is invoked, + * it will wait for space to become available and ensure that the events + * have been enqueued. + * + * When this call returns, the events have been accepted into the buffer, + * but it may not have been published yet. + * Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param events - An array of {@link EventData} or `AmqpAnnotatedMessage`. + * @param options - A set of options that can be specified to influence the way in which + * events are sent to the associated Event Hub. + * - `abortSignal` : A signal used to cancel the enqueueEvents operation. + * - `partitionId` : The partition this set of events will be sent to. If set, `partitionKey` can not be set. + * - `partitionKey` : A value that is hashed to produce a partition assignment. If set, `partitionId` can not be set. + * @returns The total number of events that are currently buffered and waiting to be published, across all partitions. + */ + async enqueueEvents( + events: EventData[] | AmqpAnnotatedMessage[], + options: EnqueueEventOptions = {} + ): Promise { + for (const event of events) { + await this.enqueueEvent(event, options); + } + + return this._getTotalBufferedEventsCount(); + } + + /** + * Attempts to publish all events in the buffer immediately. + * This may result in multiple batches being published, + * the outcome of each of which will be individually reported by + * the `onSendEventsSuccessHandler` and `onSendEventsErrorHandler` handlers. + * + * @param options - The set of options to apply to the operation call. + */ + async flush(options: BufferedFlushOptions = {}): Promise { + await Promise.all( + Array.from(this._partitionChannels.values()).map((channel) => channel.flush(options)) + ); + } + + /** + * Provides the Event Hub runtime information. + * @param options - The set of options to apply to the operation call. + * @returns A promise that resolves with information about the Event Hub instance. + * @throws Error if the underlying connection has been closed, create a new EventHubBufferedProducerClient. + * @throws AbortError if the operation is cancelled via the abortSignal. + */ + getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise { + return this._producer.getEventHubProperties(options); + } + + /** + * Provides the id for each partition associated with the Event Hub. + * @param options - The set of options to apply to the operation call. + * @returns A promise that resolves with an Array of strings representing the id for + * each partition associated with the Event Hub. + * @throws Error if the underlying connection has been closed, create a new EventHubBufferedProducerClient. + * @throws AbortError if the operation is cancelled via the abortSignal. + */ + getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { + return this._producer.getPartitionIds(options); + } + + /** + * Provides information about the state of the specified partition. + * @param partitionId - The id of the partition for which information is required. + * @param options - The set of options to apply to the operation call. + * @returns A promise that resolves with information about the state of the partition . + * @throws Error if the underlying connection has been closed, create a new EventHubBufferedProducerClient. + * @throws AbortError if the operation is cancelled via the abortSignal. + */ + getPartitionProperties( + partitionId: string, + options: GetPartitionPropertiesOptions = {} + ): Promise { + return this._producer.getPartitionProperties(partitionId, options); + } + + /** + * Gets the `BatchingPartitionChannel` associated with the partitionId. + * + * If one does not exist, it is created. + */ + private _getPartitionChannel(partitionId: string): BatchingPartitionChannel { + const partitionChannel = + this._partitionChannels.get(partitionId) ?? + new BatchingPartitionChannel({ + loopAbortSignal: this._abortController.signal, + maxBufferSize: this._clientOptions.maxEventBufferLengthPerPartition || 1500, + maxWaitTimeInMs: this._clientOptions.maxWaitTimeInMs || 1000, + onSendEventsErrorHandler: this._clientOptions.onSendEventsErrorHandler, + onSendEventsSuccessHandler: this._clientOptions.onSendEventsSuccessHandler, + partitionId, + producer: this._producer + }); + this._partitionChannels.set(partitionId, partitionChannel); + return partitionChannel; + } + + /** + * Returns the total number of buffered events across all partitions. + */ + private _getTotalBufferedEventsCount(): number { + let total = 0; + for (const [_, channel] of this._partitionChannels) { + total += channel.getCurrentBufferedCount(); + } + + return total; + } +} diff --git a/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts b/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts new file mode 100644 index 000000000000..e4c8a0a6363b --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * `AwaitableQueue` stores items in the order that they are received. + * + * This differs from ordinary Queues in that `shift` returns a Promise for a value. + * This allows a consumer of the queue to request an item that the queue does not yet have. + * + * @hidden + */ +export class AwaitableQueue { + private readonly _items: T[]; + + private readonly _resolvers: Array<(value: T) => void> = []; + + constructor() { + this._items = []; + } + + public size(): number { + return this._items.length; + } + + /** + * Returns a Promise that will resolve with the next item in the queue. + */ + public shift(): Promise { + const item = this._items.shift(); + if (typeof item !== "undefined") { + return Promise.resolve(item); + } + + return new Promise((resolve) => this._resolvers.push(resolve)); + } + + /** + * Appends new item to the queue. + */ + public push(item: T): void { + if (!this._resolveNextItem(item)) { + this._items.push(item); + } + } + + private _resolveNextItem(item: T) { + const resolver = this._resolvers.shift(); + if (!resolver) { + return false; + } + + resolver(item); + return true; + } +} diff --git a/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts new file mode 100644 index 000000000000..c78f1b033500 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { isDefined } from "../util/typeGuards"; +import { mapPartitionKeyToId } from "./patitionKeyToIdMapper"; + +/** + * @internal + * Assigns a partition based on the partition ids it knows about and an optional partition id or partition key. + */ +export class PartitionAssigner { + private _partitions: string[] = []; + + private _lastRoundRobinPartitionIndex: number = -1; + + /** + * Set the partition ids that can be used when assigning a partition. + * @param partitionIds - All valid partition ids. + */ + public setPartitionIds(partitionIds: string[]): void { + this._partitions = partitionIds; + } + + /** + * Returns a partitionId from the list of partition ids set via `setPartitionIds`. + * + * If a partitionId is specified, then that will be returned directly. + * If a partitionKey is specified, then a partitionId will be calculated based on the partitionKey. + * Specifying both partitionId and partitionKey results in an error. + * + * If neither partitionId nor partitionKey are specified, then a partitionId will be selected + * based on a round-robin approach. + */ + assignPartition({ + partitionId, + partitionKey + }: { + partitionId?: string; + partitionKey?: string; + }): string { + if (isDefined(partitionId) && isDefined(partitionKey)) { + throw new Error( + `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` + ); + } + + if (!this._partitions.length) { + throw new Error(`Unable to determine partitionIds, can't assign partitionId.`); + } + + if (isDefined(partitionId) && this._partitions.includes(partitionId)) { + return partitionId; + } + + if (isDefined(partitionKey)) { + return mapPartitionKeyToId(partitionKey, this._partitions.length).toString(); + } + + return this._assignRoundRobinPartition(); + } + + private _assignRoundRobinPartition(): string { + const maxPartitionIndex = this._partitions.length - 1; + const proposedPartitionIndex = this._lastRoundRobinPartitionIndex + 1; + + const nextPartitionIndex = + proposedPartitionIndex > maxPartitionIndex ? 0 : proposedPartitionIndex; + + this._lastRoundRobinPartitionIndex = nextPartitionIndex; + return this._partitions[nextPartitionIndex]; + } +} diff --git a/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts b/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts new file mode 100644 index 000000000000..37b2c8e80ac4 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts @@ -0,0 +1,137 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/* eslint-disable no-fallthrough */ + +import os from "os"; + +export function mapPartitionKeyToId(partitionKey: string, partitionCount: number): number { + const hash = computeHash(Buffer.from(partitionKey, "utf8")); + const hashedParitionKey = castToInt16(hash.c ^ hash.b); + return Math.abs(hashedParitionKey % partitionCount); +} + +function readUInt32(data: Buffer, offset: number): number { + return os.endianness() === "BE" ? data.readUInt32BE(offset) : data.readUInt32LE(offset); +} + +function castToInt16(n: number): number { + return new Int16Array([n])[0]; +} + +function computeHash( + data: Buffer, + seed1: number = 0, + seed2: number = 0 +): { + b: number; + c: number; +} { + let a: number, b: number, c: number; + + a = b = c = 0xdeadbeef + data.length + seed1; + c += seed2; + + let index = 0, + size = data.length; + while (size > 12) { + a += readUInt32(data, index); + b += readUInt32(data, index + 4); + c += readUInt32(data, index + 8); + + a -= c; + a ^= (c << 4) | (c >>> 28); + c += b; + + b -= a; + b ^= (a << 6) | (a >>> 26); + a += c; + + c -= b; + c ^= (b << 8) | (b >>> 24); + b += a; + + a -= c; + a ^= (c << 16) | (c >>> 16); + c += b; + + b -= a; + b ^= (a << 19) | (a >>> 13); + a += c; + + c -= b; + c ^= (b << 4) | (b >>> 28); + b += a; + + index += 12; + size -= 12; + } + + let curr = size; + switch (curr) { + case 12: + a += readUInt32(data, index); + b += readUInt32(data, index + 4); + c += readUInt32(data, index + 8); + break; + case 11: + c += data[index + 10] << 16; + curr = 10; + case 10: + c += data[index + 9] << 8; + curr = 9; + case 9: + c += data[index + 8]; + curr = 8; + case 8: + b += readUInt32(data, index + 4); + a += readUInt32(data, index); + break; + case 7: + b += data[index + 6] << 16; + curr = 6; + case 6: + b += data[index + 5] << 8; + curr = 5; + case 5: + b += data[index + 4]; + curr = 4; + case 4: + a += readUInt32(data, index); + break; + case 3: + a += data[index + 2] << 16; + curr = 2; + case 2: + a += data[index + 1] << 8; + curr = 1; + case 1: + a += data[index]; + break; + case 0: + return { b: b >>> 0, c: c >>> 0 }; + } + + c ^= b; + c -= (b << 14) | (b >>> 18); + + a ^= c; + a -= (c << 11) | (c >>> 21); + + b ^= a; + b -= (a << 25) | (a >>> 7); + + c ^= b; + c -= (b << 16) | (b >>> 16); + + a ^= c; + a -= (c << 4) | (c >>> 28); + + b ^= a; + b -= (a << 14) | (a >>> 18); + + c ^= b; + c -= (b << 24) | (b >>> 8); + + return { b: b >>> 0, c: c >>> 0 }; +} diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index dfff7ed4d8f5..28e465b0091b 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -19,6 +19,15 @@ export { } from "./models/public"; export { EventHubConsumerClient } from "./eventHubConsumerClient"; export { EventHubProducerClient } from "./eventHubProducerClient"; +export { + BufferedCloseOptions, + EventHubBufferedProducerClient, + EventHubBufferedProducerClientOptions, + EnqueueEventOptions, + BufferedFlushOptions, + OnSendEventsErrorContext, + OnSendEventsSuccessContext +} from "./eventHubBufferedProducerClient"; export { SubscribeOptions, Subscription, diff --git a/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts b/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts new file mode 100644 index 000000000000..202ca3f3128e --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * @internal + * Returns a promise and the promise's resolve and reject methods. + */ +export function getPromiseParts(): { + promise: Promise; + resolve: (value: T) => void; + reject: (reason: Error) => void; +} { + let resolver: (value: T) => void; + let rejector: (reason?: any) => void; + const promise = new Promise((resolve, reject) => { + resolver = resolve; + rejector = reject; + }); + return { + promise, + resolve: resolver!, + reject: rejector! + }; +} diff --git a/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts b/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts new file mode 100644 index 000000000000..60bcffa4b9a1 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { testWithServiceTypes } from "../../public/utils/testWithServiceTypes"; +import { AwaitableQueue } from "../../../src/impl/awaitableQueue"; +const should = chai.should(); + +testWithServiceTypes(() => { + describe("AwaitableQueue", () => { + it("can be instantiated", () => { + const queue = new AwaitableQueue(); + should.exist(queue, "queue was not defined."); + should.equal(queue.size(), 0, "Unexpected number of values in queue."); + }); + + it("supports adding and removing items", async () => { + const expectedNumberOfItems = 10; + const queue = new AwaitableQueue(); + + for (let i = 0; i < expectedNumberOfItems; i++) { + queue.push(i); + } + + should.equal(queue.size(), expectedNumberOfItems, "Unexpected number of items in queue."); + + let receivedCount = 0; + while (queue.size()) { + const value = await queue.shift(); + should.equal(value, receivedCount, "Unexpected value shifted from queue."); + receivedCount++; + } + }); + + it("shift resolves with next pushed item", async () => { + const queue = new AwaitableQueue(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + const futureValue = queue.shift(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + queue.push("foo"); + + const value = await futureValue; + should.equal(value, "foo", "Unexpected value"); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + }); + + it("each shift call resolves with the next consecutive item that appears in the queue", async () => { + const queue = new AwaitableQueue(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + const expectedResults = ["foo", "bar", "baz"]; + const futureValues: Promise[] = []; + + for (let i = 0; i < expectedResults.length; i++) { + futureValues.push(queue.shift()); + } + + for (const input of expectedResults) { + queue.push(input); + } + + const values = await Promise.all(futureValues); + for (let i = 0; i < values.length; i++) { + should.equal(values[i], expectedResults[i], "Unexpected value encountered."); + } + + should.equal(queue.size(), 0, "Expected the queue to be empty."); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts b/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts new file mode 100644 index 000000000000..e076173a9eae --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import { mapPartitionKeyToId } from "../../../src/impl/patitionKeyToIdMapper"; + +/** + * These unit tests have been created from outputs received from the C# implementation + * of Jenkins lookup3 that the Event Hubs service uses. + */ +describe("mapPartitionKeyToId", () => { + it("short key, small partitions count", () => { + assert.equal(mapPartitionKeyToId("alphabet", 3), 0); + }); + + it("short key, large partitions count", () => { + assert.equal(mapPartitionKeyToId("alphabet", 11), 4); + }); + + it("long key, small partitions count", () => { + assert.equal(mapPartitionKeyToId("TheBestParitionEver", 4), 2); + }); + + it("long key, large partitions count", () => { + assert.equal(mapPartitionKeyToId("TheWorstParitionEver", 15), 1); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts b/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts new file mode 100644 index 000000000000..c99e4ff06ed1 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts @@ -0,0 +1,240 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +import { testWithServiceTypes } from "./utils/testWithServiceTypes"; +import { createMockServer } from "./utils/mockService"; +import { + EventHubBufferedProducerClient, + EventData, + OnSendEventsErrorContext, + OnSendEventsSuccessContext +} from "../../src/index"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; + +const assert = chai.assert; + +type ResultError = { type: "error"; context: OnSendEventsErrorContext }; +type ResultSuccess = { type: "success"; context: OnSendEventsSuccessContext }; +type ResultEnqueue = { type: "enqueue"; event: EventData | AmqpAnnotatedMessage }; +type ResultFlush = { type: "flush" }; +type Result = ResultEnqueue | ResultError | ResultSuccess | ResultFlush; + +testWithServiceTypes((serviceVersion) => { + const env = getEnvVars(); + if (serviceVersion === "mock") { + let service: ReturnType; + before("Starting mock service", () => { + service = createMockServer(); + return service.start(); + }); + + after("Stopping mock service", () => { + return service?.stop(); + }); + } + + describe("EventHubBufferedProducerClient", () => { + const connectionString = env[EnvVarKeys.EVENTHUB_CONNECTION_STRING]; + const eventHubName = env[EnvVarKeys.EVENTHUB_NAME]; + let client: EventHubBufferedProducerClient | undefined; + + before(() => { + assert.exists( + connectionString, + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + assert.exists( + eventHubName, + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + afterEach("Ensure client is closed between tests.", async () => { + if (client) { + await client.close(); + client = undefined; + } + }); + + describe("enqueueEvent", () => { + afterEach("close EventHubBufferedProducerClient", () => { + return client?.close({ flush: false }); + }); + + it("batches events targetting the same partitionId together", async () => { + const results: Result[] = []; + const expectedEventCount = 10; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ body: `Test event ${i}` }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxWaitTimeInMs: 1000 + }); + + for (let i = 0; i < expectedEventCount; i++) { + const bufferedEventCount = await client.enqueueEvent(testEvents[i], { partitionId: "0" }); + assert.equal(bufferedEventCount, i + 1, "Unexpected number of events buffered."); + } + + await client.flush(); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("batches events targetting the same partitionKey together", async () => { + const results: Result[] = []; + const expectedEventCount = 10; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ body: `Test event ${i}` }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxWaitTimeInMs: 1000 + }); + + for (let i = 0; i < expectedEventCount; i++) { + const bufferedEventCount = await client.enqueueEvent(testEvents[i], { + partitionKey: "foo" + }); + assert.equal(bufferedEventCount, i + 1, "Unexpected number of events buffered."); + } + + await client.flush(); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("waits until buffer has space for the event before yielding", async () => { + const results: Result[] = []; + const expectedEventCount = 5; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ + body: `Test event ${i}` + }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxEventBufferLengthPerPartition: 2 + }); + + for (const testEvent of testEvents) { + await client.enqueueEvent(testEvent, { + partitionKey: "foo" + }); + results.push({ + type: "enqueue", + event: testEvent + }); + } + + await client.flush(); + const resultTypes = results.map((r) => r.type); + const resultEnqueued = results + .filter((r) => r.type === "enqueue") + .map((r) => (r as ResultEnqueue).event); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultTypes, [ + "enqueue", + "enqueue", + "success", + "enqueue", + "enqueue", + "success", + "enqueue", + "success" + ]); + assert.deepEqual( + resultEnqueued, + testEvents, + "Expected enqueued events to match test events." + ); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("waits until flush is complete to enqueue", async () => { + const results: Result[] = []; + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxEventBufferLengthPerPartition: 2 + }); + + /** + * One way to test that `enqueueEvent` waits for an in-progress `flush` + * to complete before yielding is to call `enqueueEvent` before `flush` yields. + * + * `flush` won't complete until any buffered events are either successfully sent + * or they error out. That means we can track when the `success` handler is called + * and when `flush` yields, and if there were buffered events we should see them + * one after the other. + * + * We enqueue an event to start with to ensure there's something to flush. + * + * Next, we call `flush`, and then another `enqueueEvent` without waiting for + * the `flush` to yield. + * + * Finally, we call `flush` after both methods complete to ensure there was still + * an event to send to the service. + * + * If this works properly, we should see: + * [ "success", "flush", "success", "flush" ] and each "success" should have a single event. + * + * This indicates that the 2nd `enqueueEvent` had to wait for the flush to complete before + * the event was actually accepted. + * + * If the 2nd `enqueueEvent` had not waited for the `flush` to complete, we would have seen: + * [ "success", "flush", "flush"] and the "success" would have had 2 events. + */ + + await client.enqueueEvent({ body: 1 }, { partitionId: "0" }); + await Promise.all([ + client.flush().then(() => results.push({ type: "flush" })), + client.enqueueEvent({ body: 2 }, { partitionId: "0" }) + ]); + await client.flush(); + results.push({ type: "flush" }); + + const resultTypes = results.map((r) => r.type); + assert.deepEqual(resultTypes, ["success", "flush", "success", "flush"]); + }); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts b/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts index 2914c1567841..5cc0db28a135 100644 --- a/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts +++ b/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts @@ -11,10 +11,20 @@ import { EnvVarKeys, getEnvVars, setTracerForTest } from "./utils/testUtils"; import { setSpan, context } from "@azure/core-tracing"; import { SpanGraph } from "@azure/test-utils"; -import { EventHubProducerClient, EventHubConsumerClient, MessagingError } from "../../src"; +import { + EventHubBufferedProducerClient, + EventHubProducerClient, + EventHubConsumerClient, + MessagingError +} from "../../src"; import { testWithServiceTypes } from "./utils/testWithServiceTypes"; import { createMockServer } from "./utils/mockService"; +type ClientCommonMethods = Pick< + EventHubProducerClient, + "close" | "getEventHubProperties" | "getPartitionIds" | "getPartitionProperties" +>; + testWithServiceTypes((serviceVersion) => { const env = getEnvVars(); if (serviceVersion === "mock") { @@ -30,8 +40,13 @@ testWithServiceTypes((serviceVersion) => { } describe("RuntimeInformation", function(): void { - let producerClient: EventHubProducerClient; - let consumerClient: EventHubConsumerClient; + const clientTypes = [ + "EventHubBufferedProducerClient", + "EventHubConsumerClient", + "EventHubProducerClient" + ] as const; + const clientMap = new Map(); + const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] @@ -49,17 +64,28 @@ testWithServiceTypes((serviceVersion) => { beforeEach(async () => { debug("Creating the clients.."); - producerClient = new EventHubProducerClient(service.connectionString, service.path); - consumerClient = new EventHubConsumerClient( - EventHubConsumerClient.defaultConsumerGroupName, - service.connectionString, - service.path + clientMap.set( + "EventHubBufferedProducerClient", + new EventHubBufferedProducerClient(service.connectionString, service.path) + ); + clientMap.set( + "EventHubConsumerClient", + new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ) + ); + clientMap.set( + "EventHubProducerClient", + new EventHubProducerClient(service.connectionString, service.path) ); }); afterEach("close the connection", async function(): Promise { - await producerClient.close(); - await consumerClient.close(); + for (const client of clientMap.values()) { + await client?.close(); + } }); function arrayOfIncreasingNumbersFromZero(length: any): Array { @@ -70,373 +96,186 @@ testWithServiceTypes((serviceVersion) => { return result; } - describe("getPartitionIds", function(): void { - it("EventHubProducerClient returns an array of partition IDs", async function(): Promise< - void - > { - const ids = await producerClient.getPartitionIds({}); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - }); - - it("EventHubConsumerClient returns an array of partition IDs", async function(): Promise< - void - > { - const ids = await consumerClient.getPartitionIds({}); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - }); - - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const ids = await producerClient.getPartitionIds({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + clientTypes.forEach((clientType) => { + describe(`${clientType}.getPartitionIds`, () => { + it("returns an array of partition ids", async () => { + const client = clientMap.get(clientType)!; + const ids = await client.getPartitionIds({}); + ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); }); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + it("can be manually traced", async () => { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); - const rootSpan = tracer.startSpan("root"); - const ids = await consumerClient.getPartitionIds({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } - }); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] + const rootSpan = tracer.startSpan("root"); + const ids = await client.getPartitionIds({ + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - }); - - describe("hub runtime information", function(): void { - it("EventHubProducerClient gets the hub runtime information", async function(): Promise< - void - > { - const hubRuntimeInfo = await producerClient.getEventHubProperties(); - debug(hubRuntimeInfo); - hubRuntimeInfo.name.should.equal(service.path); - - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - hubRuntimeInfo.createdOn.should.be.instanceof(Date); - }); - - it("EventHubConsumerClient gets the hub runtime information", async function(): Promise< - void - > { - const hubRuntimeInfo = await consumerClient.getEventHubProperties(); - debug(hubRuntimeInfo); - hubRuntimeInfo.name.should.equal(service.path); - - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - hubRuntimeInfo.createdOn.should.be.instanceof(Date); - }); - - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const hubRuntimeInfo = await producerClient.getEventHubProperties({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + }); + ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getEventHubProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); }); - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); }); - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + describe(`${clientType}.getEventHubProperties`, () => { + it("gets the Event Hub runtime information", async () => { + const client = clientMap.get(clientType)!; + const hubRuntimeInfo = await client.getEventHubProperties(); + hubRuntimeInfo.name.should.equal(service.path); - const rootSpan = tracer.startSpan("root"); - const hubRuntimeInfo = await consumerClient.getEventHubProperties({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + hubRuntimeInfo.partitionIds.should.have.members( + arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) + ); + hubRuntimeInfo.createdOn.should.be.instanceof(Date); }); - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - }); - describe("partition runtime information", function(): void { - it("EventHubProducerClient should throw an error if partitionId is missing", async function(): Promise< - void - > { - try { - await producerClient.getPartitionProperties(undefined as any); - throw new Error("Test failure"); - } catch (err) { - err.name.should.equal("TypeError"); - err.message.should.equal( - `getPartitionProperties called without required argument "partitionId"` - ); - } - }); + it("can be manually traced", async function(): Promise { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); - it("EventHubConsumerClient should throw an error if partitionId is missing", async function(): Promise< - void - > { - try { - await consumerClient.getPartitionProperties(undefined as any); - throw new Error("Test failure"); - } catch (err) { - err.name.should.equal("TypeError"); - err.message.should.equal( - `getPartitionProperties called without required argument "partitionId"` + const rootSpan = tracer.startSpan("root"); + const hubRuntimeInfo = await client.getEventHubProperties({ + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) + } + }); + hubRuntimeInfo.partitionIds.should.have.members( + arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) ); - } - }); - - it("EventHubProducerClient gets the partition runtime information with partitionId as a string", async function(): Promise< - void - > { - const partitionRuntimeInfo = await producerClient.getPartitionProperties("0"); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubConsumerClient gets the partition runtime information with partitionId as a string", async function(): Promise< - void - > { - const partitionRuntimeInfo = await consumerClient.getPartitionProperties("0"); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubProducerClient gets the partition runtime information with partitionId as a number", async function(): Promise< - void - > { - const partitionRuntimeInfo = await producerClient.getPartitionProperties(0 as any); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubConsumerClient gets the partition runtime information with partitionId as a number", async function(): Promise< - void - > { - const partitionRuntimeInfo = await consumerClient.getPartitionProperties(0 as any); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubProducerClient bubbles up error from service for invalid partitionId", async function(): Promise< - void - > { - try { - await producerClient.getPartitionProperties("boo"); - throw new Error("Test failure"); - } catch (err) { - debug(`>>>> Received error - `, err); - should.exist(err); - should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); - } - }); - - it("EventHubConsumerClient bubbles up error from service for invalid partitionId", async function(): Promise< - void - > { - try { - await consumerClient.getPartitionProperties("boo"); - throw new Error("Test failure"); - } catch (err) { - debug(`>>>> Received error - `, err); - should.exist(err); - should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); - } + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getEventHubProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); }); - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const partitionRuntimeInfo = await producerClient.getPartitionProperties("0", { - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) + describe(`${clientType}.getPartitionProperties`, () => { + it("should throw an error if partitionId is missing", async () => { + try { + const client = clientMap.get(clientType)!; + await client.getPartitionProperties(undefined as any); + throw new Error("Test failure"); + } catch (err) { + (err as any).name.should.equal("TypeError"); + (err as any).message.should.equal( + `getPartitionProperties called without required argument "partitionId"` + ); } }); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getPartitionProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); + it("gets the partition runtime information with partitionId as a string", async () => { + const client = clientMap.get(clientType)!; + const partitionRuntimeInfo = await client.getPartitionProperties("0"); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + }); - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + it("gets the partition runtime information with partitionId as a number", async () => { + const client = clientMap.get(clientType)!; + const partitionRuntimeInfo = await client.getPartitionProperties(0 as any); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + }); - const rootSpan = tracer.startSpan("root"); - const partitionRuntimeInfo = await consumerClient.getPartitionProperties("0", { - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) + it("bubbles up error from service for invalid partitionId", async () => { + try { + const client = clientMap.get(clientType)!; + await client.getPartitionProperties("boo"); + throw new Error("Test failure"); + } catch (err) { + should.exist(err); + should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); } }); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getPartitionProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); + it("can be manually traced", async () => { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + const partitionRuntimeInfo = await client.getPartitionProperties("0", { + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) + } + }); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getPartitionProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); }); }); }).timeout(60000); diff --git a/sdk/eventhub/event-hubs/tsconfig.json b/sdk/eventhub/event-hubs/tsconfig.json index 2570c3f0d26e..4a2b540ed8ba 100644 --- a/sdk/eventhub/event-hubs/tsconfig.json +++ b/sdk/eventhub/event-hubs/tsconfig.json @@ -4,6 +4,7 @@ "declarationDir": "./types", "outDir": "./dist-esm", "downlevelIteration": true, + "noFallthroughCasesInSwitch": false, "paths": { "@azure/event-hubs": ["./src/index"] } diff --git a/sdk/eventhub/mock-hub/src/services/eventHubs.ts b/sdk/eventhub/mock-hub/src/services/eventHubs.ts index 621d67f8e5e0..1eabdad64210 100644 --- a/sdk/eventhub/mock-hub/src/services/eventHubs.ts +++ b/sdk/eventhub/mock-hub/src/services/eventHubs.ts @@ -116,6 +116,8 @@ export class MockEventHub implements IMockEventHub { private _connectionInactivityTimeoutInMs: number; private _connections: Set = new Set(); + + private _clearableTimeouts = new Set>(); /** * This provides a way to find all the partition senders for a combination * of `consumerGroup` and `partitionId`. @@ -188,10 +190,13 @@ export class MockEventHub implements IMockEventHub { }; let tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); + this._clearableTimeouts.add(tid); const bounceTimeout = () => { clearTimeout(tid); + this._clearableTimeouts.delete(tid); tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); + this._clearableTimeouts.add(tid); }; connection.addListener(ConnectionEvents.settled, bounceTimeout); @@ -710,6 +715,10 @@ export class MockEventHub implements IMockEventHub { * Stops the service. */ stop() { + for (const tid of this._clearableTimeouts.values()) { + clearTimeout(tid); + } + this._clearableTimeouts.clear(); return this._mockServer.stop(); }