Skip to content

Commit

Permalink
[Storage] Queues one message receive (#15180)
Browse files Browse the repository at this point in the history
* Prepare Storage for release

* pr feedback.

* PR feedback.

* pr feedback.

* make open write work with using.

* move recordings to right place

* pr feedback.

* receive one message.

* peek one message

* api

* remove redundant scope.

* pr feedback.
  • Loading branch information
kasobol-msft authored Sep 16, 2020
1 parent 28a6bc6 commit 3e61e11
Show file tree
Hide file tree
Showing 21 changed files with 2,033 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public QueueClient(System.Uri queueUri, Azure.Storage.StorageSharedKeyCredential
public virtual System.Threading.Tasks.Task<Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>>> GetAccessPolicyAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueProperties> GetProperties(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueProperties>> GetPropertiesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage> PeekMessage(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage>> PeekMessageAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]> PeekMessages(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]>> PeekMessagesAsync(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage> ReceiveMessage(System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueMessage>> ReceiveMessageAsync(System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages() { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages(int? maxMessages = default(int?), System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
173 changes: 171 additions & 2 deletions sdk/storage/Azure.Storage.Queues/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,7 @@ public virtual Response<QueueMessage[]> ReceiveMessages(
ReceiveMessagesInternal(
maxMessages,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
false, // async
cancellationToken)
.EnsureCompleted();
Expand Down Expand Up @@ -1752,6 +1753,7 @@ public virtual async Task<Response<QueueMessage[]>> ReceiveMessagesAsync(
await ReceiveMessagesInternal(
maxMessages,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
true, // async
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -1770,6 +1772,9 @@ await ReceiveMessagesInternal(
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="operationName">
/// Operation name for diagnostic logging.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
Expand All @@ -1782,6 +1787,7 @@ await ReceiveMessagesInternal(
private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
int? maxMessages,
TimeSpan? visibilityTimeout,
string operationName,
bool async,
CancellationToken cancellationToken)
{
Expand All @@ -1803,7 +1809,7 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
numberOfMessages: maxMessages,
visibilitytimeout: (int?)visibilityTimeout?.TotalSeconds,
async: async,
operationName: $"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
operationName: operationName,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand Down Expand Up @@ -1835,8 +1841,165 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
}
}
}

#endregion ReceiveMessages

#region ReceiveMessage

/// <summary>
/// Receives one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
public virtual Response<QueueMessage> ReceiveMessage(
TimeSpan? visibilityTimeout = default,
CancellationToken cancellationToken = default) =>
ReceiveMessageInternal(
visibilityTimeout,
false, // async
cancellationToken)
.EnsureCompleted();

/// <summary>
/// Retrieves one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
public virtual async Task<Response<QueueMessage>> ReceiveMessageAsync(
TimeSpan? visibilityTimeout = default,
CancellationToken cancellationToken = default) =>
await ReceiveMessageInternal(
visibilityTimeout,
true, // async
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Retrieves one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
private async Task<Response<QueueMessage>> ReceiveMessageInternal(
TimeSpan? visibilityTimeout,
bool async,
CancellationToken cancellationToken)
{
var response = await ReceiveMessagesInternal(
1,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessage)}",
async,
cancellationToken).ConfigureAwait(false);
var queueMessage = response.Value.FirstOrDefault();
var rawResponse = response.GetRawResponse();
return Response.FromValue(queueMessage, rawResponse);
}
#endregion ReceiveMessage

#region PeekMessage
/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
public virtual Response<PeekedMessage> PeekMessage(
CancellationToken cancellationToken = default) =>
PeekMessageInternal(
false, // async
cancellationToken)
.EnsureCompleted();

/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
public virtual async Task<Response<PeekedMessage>> PeekMessageAsync(
CancellationToken cancellationToken = default) =>
await PeekMessageInternal(
true, // async
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
private async Task<Response<PeekedMessage>> PeekMessageInternal(
bool async,
CancellationToken cancellationToken)
{
var response = await PeekMessagesInternal(1, $"{nameof(QueueClient)}.{nameof(PeekMessage)}", async, cancellationToken).ConfigureAwait(false);
var message = response.Value.FirstOrDefault();
var rawResonse = response.GetRawResponse();
return Response.FromValue(message, rawResonse);
}
#endregion PeekMessage

#region PeekMessages
/// <summary>
/// Retrieves one or more messages from the front of the queue but does not alter the visibility of the message.
Expand All @@ -1860,6 +2023,7 @@ public virtual Response<PeekedMessage[]> PeekMessages(
CancellationToken cancellationToken = default) =>
PeekMessagesInternal(
maxMessages,
$"{nameof(QueueClient)}.{nameof(PeekMessages)}",
false, // async
cancellationToken)
.EnsureCompleted();
Expand All @@ -1886,6 +2050,7 @@ public virtual async Task<Response<PeekedMessage[]>> PeekMessagesAsync(
CancellationToken cancellationToken = default) =>
await PeekMessagesInternal(
maxMessages,
$"{nameof(QueueClient)}.{nameof(PeekMessages)}",
true, // async
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -1901,6 +2066,9 @@ await PeekMessagesInternal(
/// Optional. A nonzero integer value that specifies the number of messages to peek from the queue, up to a maximum of 32.
/// By default, a single message is peeked from the queue with this operation.
/// </param>
/// <param name="operationName">
/// Operation name for diagnostic logging.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
Expand All @@ -1912,6 +2080,7 @@ await PeekMessagesInternal(
/// </returns>
private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
int? maxMessages,
string operationName,
bool async,
CancellationToken cancellationToken)
{
Expand All @@ -1931,7 +2100,7 @@ private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
version: Version.ToVersionString(),
numberOfMessages: maxMessages,
async: async,
operationName: $"{nameof(QueueClient)}.{nameof(PeekMessages)}",
operationName: operationName,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand Down
Loading

0 comments on commit 3e61e11

Please sign in to comment.