Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Queues one message receive #15180

Merged
merged 33 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
78abf66
Prepare Storage for release
kasobol-msft May 6, 2020
e7907a4
pr feedback.
kasobol-msft May 6, 2020
58814fc
PR feedback.
kasobol-msft May 6, 2020
1286434
pr feedback.
kasobol-msft May 6, 2020
324afd0
merge upstream/master
kasobol-msft May 15, 2020
b216c89
Merge remote-tracking branch 'upstream/master'
kasobol-msft May 19, 2020
1c9e87d
Merge remote-tracking branch 'upstream/master'
kasobol-msft Jun 2, 2020
712ecda
Merge remote-tracking branch 'upstream/master'
kasobol-msft Jun 4, 2020
ad612b5
merge upstream/master
kasobol-msft Jun 10, 2020
5849bef
master merge
kasobol-msft Jul 7, 2020
1bcba8b
Merge remote-tracking branch 'upstream/master'
kasobol-msft Jul 29, 2020
8a2e048
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 3, 2020
1bf033e
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 8, 2020
b35274e
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 10, 2020
feb0649
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 11, 2020
5974722
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 11, 2020
9252944
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 14, 2020
f90886e
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 20, 2020
3b9b259
Merge remote-tracking branch 'upstream/master'
kasobol-msft Aug 21, 2020
f97e9bd
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 1, 2020
ae9d658
make open write work with using.
kasobol-msft Sep 1, 2020
4a24b96
move recordings to right place
kasobol-msft Sep 1, 2020
e970ed7
pr feedback.
kasobol-msft Sep 1, 2020
ed559ba
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 1, 2020
f477f9a
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 3, 2020
70bac18
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 3, 2020
342d763
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 8, 2020
4c8ee0b
Merge remote-tracking branch 'upstream/master'
kasobol-msft Sep 9, 2020
89fef83
receive one message.
kasobol-msft Sep 15, 2020
1a291c6
peek one message
kasobol-msft Sep 15, 2020
8dcc538
api
kasobol-msft Sep 15, 2020
f89a523
remove redundant scope.
kasobol-msft Sep 15, 2020
dc634c8
pr feedback.
kasobol-msft Sep 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public QueueClient(System.Uri queueUri, Azure.Storage.StorageSharedKeyCredential
public virtual System.Threading.Tasks.Task<Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>>> GetAccessPolicyAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueProperties> GetProperties(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueProperties>> GetPropertiesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage> PeekMessage(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage>> PeekMessageAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]> PeekMessages(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]>> PeekMessagesAsync(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage> ReceiveMessage(System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueMessage>> ReceiveMessageAsync(System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages() { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages(int? maxMessages = default(int?), System.TimeSpan? visibilityTimeout = default(System.TimeSpan?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueMessage[]> ReceiveMessages(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
173 changes: 171 additions & 2 deletions sdk/storage/Azure.Storage.Queues/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,7 @@ public virtual Response<QueueMessage[]> ReceiveMessages(
ReceiveMessagesInternal(
maxMessages,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
false, // async
cancellationToken)
.EnsureCompleted();
Expand Down Expand Up @@ -1752,6 +1753,7 @@ public virtual async Task<Response<QueueMessage[]>> ReceiveMessagesAsync(
await ReceiveMessagesInternal(
maxMessages,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
true, // async
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -1770,6 +1772,9 @@ await ReceiveMessagesInternal(
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="operationName">
/// Operation name for diagnostic logging.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
Expand All @@ -1782,6 +1787,7 @@ await ReceiveMessagesInternal(
private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
int? maxMessages,
TimeSpan? visibilityTimeout,
string operationName,
bool async,
CancellationToken cancellationToken)
{
Expand All @@ -1803,7 +1809,7 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
numberOfMessages: maxMessages,
visibilitytimeout: (int?)visibilityTimeout?.TotalSeconds,
async: async,
operationName: $"{nameof(QueueClient)}.{nameof(ReceiveMessages)}",
operationName: operationName,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand Down Expand Up @@ -1835,8 +1841,165 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
}
}
}

#endregion ReceiveMessages

#region ReceiveMessage

/// <summary>
/// Receives one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
public virtual Response<QueueMessage> ReceiveMessage(
TimeSpan? visibilityTimeout = default,
CancellationToken cancellationToken = default) =>
ReceiveMessageInternal(
visibilityTimeout,
false, // async
cancellationToken)
.EnsureCompleted();

/// <summary>
/// Retrieves one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
public virtual async Task<Response<QueueMessage>> ReceiveMessageAsync(
TimeSpan? visibilityTimeout = default,
CancellationToken cancellationToken = default) =>
await ReceiveMessageInternal(
visibilityTimeout,
true, // async
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Retrieves one message from the front of the queue.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/get-messages">
/// Get Messages</see>.
/// </summary>
/// <param name="visibilityTimeout">
/// Optional. Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="QueueMessage"/>
/// </returns>
private async Task<Response<QueueMessage>> ReceiveMessageInternal(
TimeSpan? visibilityTimeout,
bool async,
CancellationToken cancellationToken)
{
var response = await ReceiveMessagesInternal(
1,
visibilityTimeout,
$"{nameof(QueueClient)}.{nameof(ReceiveMessage)}",
async,
cancellationToken).ConfigureAwait(false);
var queueMessage = response.Value.FirstOrDefault();
var rawResponse = response.GetRawResponse();
return Response.FromValue(queueMessage, rawResponse);
}
#endregion ReceiveMessage

#region PeekMessage
/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
public virtual Response<PeekedMessage> PeekMessage(
CancellationToken cancellationToken = default) =>
PeekMessageInternal(
false, // async
cancellationToken)
.EnsureCompleted();

/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
public virtual async Task<Response<PeekedMessage>> PeekMessageAsync(
CancellationToken cancellationToken = default) =>
await PeekMessageInternal(
true, // async
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Retrieves one message from the front of the queue but does not alter the visibility of the message.
///
/// For more information, see
/// <see href="https://docs.microsoft.com/rest/api/storageservices/peek-messages">
/// Peek Messages</see>.
/// </summary>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/>
/// </param>
/// <returns>
/// <see cref="Response{T}"/> where T is a <see cref="PeekedMessage"/>
/// </returns>
private async Task<Response<PeekedMessage>> PeekMessageInternal(
bool async,
CancellationToken cancellationToken)
{
var response = await PeekMessagesInternal(1, $"{nameof(QueueClient)}.{nameof(PeekMessage)}", async, cancellationToken).ConfigureAwait(false);
var message = response.Value.FirstOrDefault();
var rawResonse = response.GetRawResponse();
return Response.FromValue(message, rawResonse);
}
#endregion PeekMessage

#region PeekMessages
/// <summary>
/// Retrieves one or more messages from the front of the queue but does not alter the visibility of the message.
Expand All @@ -1860,6 +2023,7 @@ public virtual Response<PeekedMessage[]> PeekMessages(
CancellationToken cancellationToken = default) =>
PeekMessagesInternal(
maxMessages,
$"{nameof(QueueClient)}.{nameof(PeekMessages)}",
false, // async
cancellationToken)
.EnsureCompleted();
Expand All @@ -1886,6 +2050,7 @@ public virtual async Task<Response<PeekedMessage[]>> PeekMessagesAsync(
CancellationToken cancellationToken = default) =>
await PeekMessagesInternal(
maxMessages,
$"{nameof(QueueClient)}.{nameof(PeekMessages)}",
true, // async
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -1901,6 +2066,9 @@ await PeekMessagesInternal(
/// Optional. A nonzero integer value that specifies the number of messages to peek from the queue, up to a maximum of 32.
/// By default, a single message is peeked from the queue with this operation.
/// </param>
/// <param name="operationName">
/// Operation name for diagnostic logging.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
Expand All @@ -1912,6 +2080,7 @@ await PeekMessagesInternal(
/// </returns>
private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
int? maxMessages,
string operationName,
bool async,
CancellationToken cancellationToken)
{
Expand All @@ -1931,7 +2100,7 @@ private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
version: Version.ToVersionString(),
numberOfMessages: maxMessages,
async: async,
operationName: $"{nameof(QueueClient)}.{nameof(PeekMessages)}",
operationName: operationName,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand Down
Loading