Skip to content

Commit

Permalink
[Storage] Queue message encoding (Azure#19328)
Browse files Browse the repository at this point in the history
* settings.

* internal model.

* wip.

* decoding.

* encode message.

* enqueue/dequeue tests.

* recording.

* undo models.

* more tests.

* handler tests.

* bundle failure args.

* checkstyle.

* checkstyle.

* readme.

* expose queue client.

* handler sample.

* CHANGELOG.md

* fix npe.

* some of pr feedback.

* one more test.

* two handlers.

* pr feedback.

* pr feedback.
  • Loading branch information
kasobol-msft authored Feb 25, 2021
1 parent affeead commit cb09813
Show file tree
Hide file tree
Showing 54 changed files with 4,408 additions and 59 deletions.
22 changes: 22 additions & 0 deletions sdk/storage/azure-storage-queue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@

## 12.9.0-beta.2 (Unreleased)

### Support for binary data, custom shapes and Base64 encoding
This release adds a convinient way to send and receive binary data and custom shapes as a payload.
Additionally, support for Base64 encoding in HTTP requests and reponses has been added that makes interoperability with V11 and prior Storage SDK easier to implement.

The `QueueClient.sendMessage` and `QueueAsyncClient.sendMessage` consume `com.azure.core.util.BinaryData` in addition to `String`.
`QueueMessageItem` and `PeekedMessageItem` expose new property `getBody()` of `com.azure.core.util.BinaryData` type to access message payload and should be used instead of `getMessageText()`.

See [BinaryData](https://docs.microsoft.com/java/api/com.azure.core.util.binarydata?view=azure-java-stable) for more information about handling `String`, binary data and custom shapes.

#### Receiving message as string
Before:
```java
QueueMessageItem message = queueClient.receiveMessage();
String messageText = message.getMessageText();
```

After:
```java
QueueMessageItem message = queueClient.receiveMessage();
BinaryData body = message.getBody();
String messageText = body.toString();
```

## 12.9.0-beta.1 (2021-02-10)
- Added support for the 2020-06-12 service version.
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/azure-storage-queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(S
// @param key: The key with which the specified value should be associated.
// @param value: The value to be associated with the specified key.
queueClient.peekMessages(5, Duration.ofSeconds(1), new Context(key, value)).forEach(message -> {
System.out.println(message.getMessageText());
System.out.println(message.getBody().toString());
});
```

Expand All @@ -378,7 +378,7 @@ QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(S
.buildClient();
// Try to receive 10 mesages: Maximum number of messages to get
queueClient.receiveMessages(10).forEach(message -> {
System.out.println(message.getMessageText());
System.out.println(message.getBody().toString());
});
```

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.implementation.StorageImplUtils;
Expand Down Expand Up @@ -69,6 +70,15 @@ public QueueServiceVersion getServiceVersion() {
return client.getServiceVersion();
}

/**
* Gets the message encoding the client is using.
*
* @return the message encoding the client is using.
*/
public QueueMessageEncoding getMessageEncoding() {
return client.getMessageEncoding();
}

/**
* Gets the {@link HttpPipeline} powering this client.
*
Expand Down Expand Up @@ -410,6 +420,29 @@ public SendMessageResult sendMessage(String messageText) {
return sendMessageWithResponse(messageText, null, null, null, Context.NONE).getValue();
}

/**
* Sends a message that has a time-to-live of 7 days and is instantly visible.
*
* <p><strong>Code Samples</strong></p>
*
* <p>Sends a message of "Hello, Azure"</p>
*
* {@codesnippet com.azure.storage.queue.queueClient.sendMessage#BinaryData}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/put-message">Azure Docs</a>.</p>
*
* @param message Message content
* @return A {@link SendMessageResult} value that contains the {@link SendMessageResult#getMessageId() messageId}
* and {@link SendMessageResult#getPopReceipt() popReceipt} that are used to interact with the message
* and other metadata about the enqueued message.
* @throws QueueStorageException If the queue doesn't exist
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SendMessageResult sendMessage(BinaryData message) {
return sendMessageWithResponse(message, null, null, null, Context.NONE).getValue();
}

/**
* Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue.
*
Expand All @@ -431,7 +464,8 @@ public SendMessageResult sendMessage(String messageText) {
* unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0
* seconds and 7 days.
* @param timeToLive Optional. How long the message will stay alive in the queue. If unset the value will default to
* 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number.
* 7 days, if {@code Duration.ofSeconds(-1)} is passed the message will not expire.
* The time to live must be {@code Duration.ofSeconds(-1)} or any positive number of seconds.
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
* concludes a {@link RuntimeException} will be thrown.
* @param context Additional context that is passed through the Http pipeline during the service call.
Expand All @@ -446,7 +480,49 @@ public SendMessageResult sendMessage(String messageText) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SendMessageResult> sendMessageWithResponse(String messageText, Duration visibilityTimeout,
Duration timeToLive, Duration timeout, Context context) {
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(messageText,
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(BinaryData.fromString(messageText),
visibilityTimeout, timeToLive, context);
return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}

/**
* Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue.
*
* <p><strong>Code Samples</strong></p>
*
* <p>Add a message of "Hello, Azure" that has a timeout of 5 seconds</p>
*
* {@codesnippet com.azure.storage.queue.QueueClient.sendMessageWithResponse#BinaryData-Duration-Duration-Duration-Context1}
*
* <p>Add a message of "Goodbye, Azure" that has a time to live of 5 seconds</p>
*
* {@codesnippet com.azure.storage.queue.QueueClient.sendMessageWithResponse#BinaryData-Duration-Duration-Duration-Context2}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/put-message">Azure Docs</a>.</p>
*
* @param message Message content
* @param visibilityTimeout Optional. The timeout period for how long the message is invisible in the queue. If
* unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0
* seconds and 7 days.
* @param timeToLive Optional. How long the message will stay alive in the queue. If unset the value will default to
* 7 days, if {@code Duration.ofSeconds(-1)} is passed the message will not expire.
* The time to live must be {@code Duration.ofSeconds(-1)} or any positive number of seconds.
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
* concludes a {@link RuntimeException} will be thrown.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the {@link SendMessageResult} value that contains the
* {@link SendMessageResult#getMessageId() messageId} and
* {@link SendMessageResult#getPopReceipt() popReceipt} that are used to
* interact with the message and other metadata about the enqueued message.
* @throws QueueStorageException If the queue doesn't exist or the {@code visibilityTimeout} or {@code timeToLive}
* are outside of the allowed limits.
* @throws RuntimeException if the operation doesn't complete before the timeout concludes.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SendMessageResult> sendMessageWithResponse(BinaryData message, Duration visibilityTimeout,
Duration timeToLive, Duration timeout, Context context) {
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(message,
visibilityTimeout, timeToLive, context);
return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.AzureQueueStorageImplBuilder;
import com.azure.storage.queue.implementation.util.BuilderHelper;
import com.azure.storage.queue.models.QueueMessageDecodingError;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* This class provides a fluent builder API to help aid the configuration and instantiation of the {@link QueueClient
Expand Down Expand Up @@ -94,6 +99,10 @@ public final class QueueClientBuilder {
private Configuration configuration;
private QueueServiceVersion version;

private QueueMessageEncoding messageEncoding = QueueMessageEncoding.NONE;
private Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler;
private Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler;

/**
* Creates a builder instance that is able to configure and construct {@link QueueClient QueueClients} and {@link
* QueueAsyncClient QueueAsyncClients}.
Expand Down Expand Up @@ -140,6 +149,13 @@ public QueueClient buildClient() {
*/
public QueueAsyncClient buildAsyncClient() {
StorageImplUtils.assertNotNull("queueName", queueName);
if (processMessageDecodingErrorAsyncHandler != null && processMessageDecodingErrorHandler != null) {
throw logger.logExceptionAsError(new IllegalStateException(
"Either processMessageDecodingError or processMessageDecodingAsyncError should be specified"
+ "but not both.")
);
}

QueueServiceVersion serviceVersion = version != null ? version : QueueServiceVersion.getLatest();

HttpPipeline pipeline = (httpPipeline != null) ? httpPipeline : BuilderHelper.buildPipeline(
Expand All @@ -153,7 +169,8 @@ public QueueAsyncClient buildAsyncClient() {
.version(serviceVersion.getVersion())
.buildClient();

return new QueueAsyncClient(azureQueueStorage, queueName, accountName, serviceVersion);
return new QueueAsyncClient(azureQueueStorage, queueName, accountName, serviceVersion,
messageEncoding, processMessageDecodingErrorAsyncHandler, processMessageDecodingErrorHandler);
}

/**
Expand Down Expand Up @@ -387,6 +404,74 @@ public QueueClientBuilder clientOptions(ClientOptions clientOptions) {
return this;
}

/**
* Sets the queue message encoding.
*
* @param messageEncoding {@link QueueMessageEncoding}.
* @return the updated QueueClientBuilder object
* @throws NullPointerException If {@code messageEncoding} is {@code null}.
*/
public QueueClientBuilder messageEncoding(QueueMessageEncoding messageEncoding) {
this.messageEncoding = Objects.requireNonNull(messageEncoding, "'messageEncoding' cannot be null.");
return this;
}

/**
* Sets the asynchronous handler that performs the tasks needed when a message is received or peaked from the queue
* but cannot be decoded.
* <p>
* Such message can be received or peaked when queue is expecting certain {@link QueueMessageEncoding}
* but there's another producer that is not encoding messages in expected way.
* I.e. the queue contains messages with different encoding.
* <p>
* {@link QueueMessageDecodingError} contains {@link QueueAsyncClient} for the queue that has received
* the message as well as {@link QueueMessageDecodingError#getQueueMessageItem()} or
* {@link QueueMessageDecodingError#getPeekedMessageItem()} with raw body, i.e. no decoding will be attempted
* so that body can be inspected as has been received from the queue.
* <p>
* The handler won't attempt to remove the message from the queue. Therefore such handling should be included into
* handler itself.
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.queue.QueueClientBuilder#processMessageDecodingErrorAsyncHandler}
*
* @param processMessageDecodingErrorAsyncHandler the handler.
* @return the updated QueueClientBuilder object
*/
public QueueClientBuilder processMessageDecodingErrorAsync(
Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler) {
this.processMessageDecodingErrorAsyncHandler = processMessageDecodingErrorAsyncHandler;
return this;
}

/**
* Sets the handler that performs the tasks needed when a message is received or peaked from the queue
* but cannot be decoded.
* <p>
* Such message can be received or peaked when queue is expecting certain {@link QueueMessageEncoding}
* but there's another producer that is not encoding messages in expected way.
* I.e. the queue contains messages with different encoding.
* <p>
* {@link QueueMessageDecodingError} contains {@link QueueAsyncClient} for the queue that has received
* the message as well as {@link QueueMessageDecodingError#getQueueMessageItem()} or
* {@link QueueMessageDecodingError#getPeekedMessageItem()} with raw body, i.e. no decoding will be attempted
* so that body can be inspected as has been received from the queue.
* <p>
* The handler won't attempt to remove the message from the queue. Therefore such handling should be included into
* handler itself.
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.queue.QueueClientBuilder#processMessageDecodingErrorHandler}
*
* @param processMessageDecodingErrorHandler the handler.
* @return the updated QueueClientBuilder object
*/
public QueueClientBuilder processMessageDecodingError(
Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler) {
this.processMessageDecodingErrorHandler = processMessageDecodingErrorHandler;
return this;
}

/**
* Sets the {@link QueueServiceVersion} that is used when making API requests.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Code generated by Microsoft (R) AutoRest Code Generator.

package com.azure.storage.queue;

/**
* Determines how queue message body is represented in HTTP requests and responses.
*/
public enum QueueMessageEncoding {
/**
* The queue message body is represented verbatim in HTTP requests and responses. I.e. message is not transformed.
*/
NONE,

/**
* The queue message body is represented as Base64 encoded string in HTTP requests and responses.
* <p>
* This was the default behavior in the prior v8 and v11 library.
* Using this option can make interop with an existing application easier.
*/
BASE64
}
Loading

0 comments on commit cb09813

Please sign in to comment.