Skip to content

Commit

Permalink
Merge pull request Azure#98 from amarzavery/recovery
Browse files Browse the repository at this point in the history
Multiple fixes
  • Loading branch information
amarzavery authored Jul 13, 2018
2 parents 1fe169b + 552d8b2 commit ebb7986
Showing 8 changed files with 61 additions and 23 deletions.
17 changes: 9 additions & 8 deletions client/lib/amqp-common/errors.ts
Original file line number Diff line number Diff line change
@@ -129,11 +129,11 @@ export enum ConditionErrorNameMapper {
/**
* Error is thrown when an operator intervened to detach for some reason.
*/
"amqp:link:detach-forced" = "DetachForcedError",
"amqp:link:detach-forced" = "DetachForcedError", // Retryable
/**
* Error is thrown when the peer sent more message transfers than currently allowed on the link.
*/
"amqp:link:transfer-limit-exceeded" = "TransferLimitExceededError",
"amqp:link:transfer-limit-exceeded" = "TransferLimitExceededError", // Retryable
/**
* Error is thrown when the message sent is too large: the maximum size is 256Kb.
*/
@@ -154,7 +154,7 @@ export enum ConditionErrorNameMapper {
/**
* Error is thrown when input was received for a link that was detached with an error.
*/
"amqp:session:errant-link" = "ErrantLinkError", // Retryable
"amqp:session:errant-link" = "ErrantLinkError",
/**
* Error is thrown when an attach was received using a handle that is already in use for an attached link.
*/
@@ -167,7 +167,7 @@ export enum ConditionErrorNameMapper {
/**
* Error is thrown when an operator intervened to close the connection for some reason.
*/
"amqp:connection:forced" = "ConnectionForcedError",
"amqp:connection:forced" = "ConnectionForcedError", // Retryable
/**
* Error is thrown when a valid frame header cannot be formed from the incoming byte stream.
*/
@@ -286,11 +286,11 @@ export enum ErrorNameConditionMapper {
/**
* Error is thrown when an operator intervened to detach for some reason.
*/
DetachForcedError = "amqp:link:detach-forced",
DetachForcedError = "amqp:link:detach-forced", // Retryable
/**
* Error is thrown when the peer sent more message transfers than currently allowed on the link.
*/
TransferLimitExceededError = "amqp:link:transfer-limit-exceeded",
TransferLimitExceededError = "amqp:link:transfer-limit-exceeded", // Retryable
/**
* Error is thrown when the message sent is too large: the maximum size is 256Kb.
*/
@@ -324,7 +324,7 @@ export enum ErrorNameConditionMapper {
/**
* Error is thrown when an operator intervened to close the connection for some reason.
*/
ConnectionForcedError = "amqp:connection:forced",
ConnectionForcedError = "amqp:connection:forced", // Retryable
/**
* Error is thrown when a valid frame header cannot be formed from the incoming byte stream.
*/
@@ -388,7 +388,8 @@ export class MessagingError extends Error {

export const retryableErrors: string[] = [
"InternalServerError", "ServerBusyError", "ServiceUnavailableError", "OperationCancelledError",
"SenderBusyError", "MessagingError"
"SenderBusyError", "MessagingError", "DetachForcedError", "ConnectionForcedError",
"TransferLimitExceededError"
];

/**
3 changes: 2 additions & 1 deletion client/lib/amqp-common/index.ts
Original file line number Diff line number Diff line change
@@ -23,5 +23,6 @@ export {
parseConnectionString, IotHubConnectionStringModel, StorageConnectionStringModel, defaultLock,
Func, ParsedOutput, getNewAsyncLock, AsyncLockOptions, ServiceBusConnectionStringModel,
isIotHubConnectionString, CreateConnectionPrameters, ServiceBusMessageAnnotations,
ServiceBusDeliveryAnnotations, EventHubDeliveryAnnotations, EventHubMessageAnnotations
ServiceBusDeliveryAnnotations, EventHubDeliveryAnnotations, EventHubMessageAnnotations,
randomNumberFromInterval
} from "./util/utils";
1 change: 1 addition & 0 deletions client/lib/amqp-common/util/constants.ts
Original file line number Diff line number Diff line change
@@ -68,3 +68,4 @@ export const minDurationValue = -922337203685477;
// https://github.com/Azure/azure-amqp/blob/master/Microsoft.Azure.Amqp/Amqp/AmqpConstants.cs#L47
export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTime();
export const aadTokenValidityMarginSeconds = 5;
export const connectionReconnectDelay = 300;
9 changes: 9 additions & 0 deletions client/lib/amqp-common/util/utils.ts
Original file line number Diff line number Diff line change
@@ -137,6 +137,15 @@ export function delay<T>(t: number, value?: T): Promise<T> {
return new Promise((resolve) => setTimeout(() => resolve(value), t));
}

/**
* Generates a random number between the given interval
* @param {number} min Min number of the range (inclusive).
* @param {number} max Max number of the range (inclusive).
*/
export function randomNumberFromInterval(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1) + min);
}

/**
* Type declaration for a Function type where T is the input to the function and V is the output of the function.
*/
18 changes: 13 additions & 5 deletions client/lib/connectionContext.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,10 @@ import {
} from "./amqp-common";
import { ManagementClient, ManagementClientOptions } from "./managementClient";
import { ClientOptions } from "./eventHubClient";
import { Connection, Dictionary, ConnectionOptions, OnAmqpEvent, EventContext, ConnectionEvents } from "./rhea-promise";
import {
Connection, Dictionary, ConnectionOptions, OnAmqpEvent, EventContext, ConnectionEvents
} from "./rhea-promise";
import { connectionReconnectDelay } from "./amqp-common/util/constants";

const debug = debugModule("azure:event-hubs:connectionContext");

@@ -146,22 +149,27 @@ export namespace ConnectionContext {
connectionContext.connection.removeHandler(ConnectionEvents.connectionOpen, onConnectionOpen);
const connectionError = context.connection ? context.connection.error : undefined;
if (connectionError) {
debug(`Error occurred on the amqp connection.`, connectionError);
debug(`[%s] Error occurred on the amqp connection: %O`,
connectionContext.connection.id, connectionError);
}
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was atleast one sender/receiver link on the connection before it went down.
if (!connectionContext.wasConnectionCloseCalled &&
(Object.keys(connectionContext.senders).length) ||
Object.keys(connectionContext.receivers).length) {
debug("connection.close() was not called from the sdk and there were some " +
"sender or receiver links or both. We should reconnect.");
await delay(100);
debug("[%s] connection.close() was not called from the sdk and there were some " +
"sender or receiver links or both. We should reconnect.", connectionContext.connection.id);
await delay(connectionReconnectDelay);
// reconnect senders if any
for (const sender of Object.values(connectionContext.senders)) {
debug("[%s] calling detached on sender '%s' with address '%s'.",
connectionContext.connection.id, sender.name, sender.address);
sender.detached();
}
// reconnect receivers if any
for (const receiver of Object.values(connectionContext.receivers)) {
debug("[%s] calling detached on receiver '%s' with address '%s'.",
connectionContext.connection.id, receiver.name, receiver.address);
receiver.detached();
}
}
5 changes: 3 additions & 2 deletions client/lib/eventHubReceiver.ts
Original file line number Diff line number Diff line change
@@ -244,8 +244,9 @@ export class EventHubReceiver extends LinkEntity {
}
} else if (this._context.receivers[this.name]) {
shouldReopen = true;
debug("[%s] Receiver's close() method was not called. There was no accompanying error " +
"as well. This is a candidate for re-establishing the sender link.");
debug("[%s] close() method of Receiver '%s' with address '%s' was not called. " +
"There was no accompanying error as well. This is a candidate for re-establishing " +
"the sender link.", this._context.connectionId, this.name, this.address);
}
if (shouldReopen) {
const rcvrOptions: CreateReceiverOptions = {
10 changes: 6 additions & 4 deletions client/lib/eventHubSender.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import {
} from "./rhea-promise";
import { EventData } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { defaultLock, Func, retry, translate, AmqpMessage, ErrorNameConditionMapper } from "./amqp-common";
import { defaultLock, Func, retry, translate, AmqpMessage, ErrorNameConditionMapper, randomNumberFromInterval } from "./amqp-common";
import { LinkEntity } from "./linkEntity";

const debug = debugModule("azure:event-hubs:sender");
@@ -188,8 +188,9 @@ export class EventHubSender extends LinkEntity {
}
} else if (this._context.senders[this.address]) {
shouldReopen = true;
debug("[%s] Sender's close() method was not called. There was no accompanying error " +
"as well. This is a candidate for re-establishing the sender link.");
debug("[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was no accompanying error as well. This is a candidate for re-establishing the sender link.",
this._context.connectionId, this.name, this.address);
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
@@ -331,7 +332,8 @@ export class EventHubSender extends LinkEntity {
}
});

return retry<Delivery>(sendEventPromise, 3, 5);
const jitter = randomNumberFromInterval(1, 4);
return retry<Delivery>(sendEventPromise, 3, 5 + jitter);
}

/**
21 changes: 18 additions & 3 deletions processor/lib/eventProcessorHost.ts
Original file line number Diff line number Diff line change
@@ -78,6 +78,11 @@ export interface EventProcessorOptions extends ClientOptionsBase {
* @property {string} [storageBlobPrefix] Prefix used when naming blobs within the storage container.
*/
storageBlobPrefix?: string;
/**
* @property {boolean} [autoCheckpoint] Automatically checkpoint the offset on behalf of the
* customer. Default value: `true`.
*/
autoCheckpoint?: boolean;
}

/**
@@ -126,6 +131,11 @@ export class EventProcessorHost extends EventEmitter {
* Error object is passed the event listener.
*/
static error: string = "ephost:error";
/**
* @property {boolean} autoCheckpoint Automatically checkpoint the offset on behalf of the
* customer. Default value: `true`.
*/
autoCheckpoint: boolean = true;

private _hostName: string;
private _consumerGroup: string;
@@ -174,6 +184,7 @@ export class EventProcessorHost extends EventEmitter {
this._contextByPartition = {};
this._receiverByPartition = {};
if (options.storageBlobPrefix) this._storageBlobPrefix = options.storageBlobPrefix;
if (options.autoCheckpoint === false) this.autoCheckpoint = false;
}

/**
@@ -264,9 +275,13 @@ export class EventProcessorHost extends EventEmitter {
const id = lease.partitionId!;
try {
debug("Renewed lease on partitionId: '%s'.", id);
const info = await this._contextByPartition![id].checkpoint();
debug(">>>> [EPH - %s] Successfully checkpointed info '%o' for partition '%s'.",
this._hostName, info, id);
if (this.autoCheckpoint) {
debug("[EPH - %s] Autocheckpoint is enabled, hence checkpointing the event metadata.",
this._hostName);
const info = await this._contextByPartition![id].checkpoint();
debug(">>>> [EPH - %s] Successfully checkpointed info '%o' for partition '%s'.",
this._hostName, info, id);
}
} catch (err) {
debug("[EPH - %s] An error occurred while checkpointing information for partition '%s': %O",
this._hostName, id, err);

0 comments on commit ebb7986

Please sign in to comment.