diff --git a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
index bf3ae5d9ef5e5..bf5ae0bed99ee 100755
--- a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
+++ b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
@@ -525,6 +525,7 @@
+
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java
index ec0991da6c9c7..c6575fc3f3789 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java
@@ -23,21 +23,26 @@
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions;
+import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.administration.models.CreateTopicOptions;
import com.azure.messaging.servicebus.administration.models.NamespaceProperties;
import com.azure.messaging.servicebus.administration.models.QueueProperties;
import com.azure.messaging.servicebus.administration.models.QueueRuntimeInfo;
+import com.azure.messaging.servicebus.administration.models.RuleProperties;
import com.azure.messaging.servicebus.administration.models.SubscriptionProperties;
import com.azure.messaging.servicebus.administration.models.SubscriptionRuntimeInfo;
import com.azure.messaging.servicebus.administration.models.TopicProperties;
import com.azure.messaging.servicebus.administration.models.TopicRuntimeInfo;
import com.azure.messaging.servicebus.implementation.EntitiesImpl;
import com.azure.messaging.servicebus.implementation.EntityHelper;
+import com.azure.messaging.servicebus.implementation.RulesImpl;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImpl;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementSerializer;
import com.azure.messaging.servicebus.implementation.models.CreateQueueBody;
import com.azure.messaging.servicebus.implementation.models.CreateQueueBodyContent;
+import com.azure.messaging.servicebus.implementation.models.CreateRuleBody;
+import com.azure.messaging.servicebus.implementation.models.CreateRuleBodyContent;
import com.azure.messaging.servicebus.implementation.models.CreateSubscriptionBody;
import com.azure.messaging.servicebus.implementation.models.CreateSubscriptionBodyContent;
import com.azure.messaging.servicebus.implementation.models.CreateTopicBody;
@@ -47,6 +52,11 @@
import com.azure.messaging.servicebus.implementation.models.QueueDescriptionEntry;
import com.azure.messaging.servicebus.implementation.models.QueueDescriptionFeed;
import com.azure.messaging.servicebus.implementation.models.ResponseLink;
+import com.azure.messaging.servicebus.implementation.models.RuleActionImpl;
+import com.azure.messaging.servicebus.implementation.models.RuleDescription;
+import com.azure.messaging.servicebus.implementation.models.RuleDescriptionEntry;
+import com.azure.messaging.servicebus.implementation.models.RuleDescriptionFeed;
+import com.azure.messaging.servicebus.implementation.models.RuleFilterImpl;
import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementError;
import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementErrorException;
import com.azure.messaging.servicebus.implementation.models.SubscriptionDescription;
@@ -102,6 +112,7 @@ public final class ServiceBusAdministrationAsyncClient {
private final EntitiesImpl entityClient;
private final ClientLogger logger = new ClientLogger(ServiceBusAdministrationAsyncClient.class);
private final ServiceBusManagementSerializer serializer;
+ private final RulesImpl rulesClient;
/**
* Creates a new instance with the given management client and serializer.
@@ -111,9 +122,10 @@ public final class ServiceBusAdministrationAsyncClient {
*/
ServiceBusAdministrationAsyncClient(ServiceBusManagementClientImpl managementClient,
ServiceBusManagementSerializer serializer) {
+ this.serializer = Objects.requireNonNull(serializer, "'serializer' cannot be null.");
this.managementClient = Objects.requireNonNull(managementClient, "'managementClient' cannot be null.");
this.entityClient = managementClient.getEntities();
- this.serializer = serializer;
+ this.rulesClient = managementClient.getRules();
}
/**
@@ -182,6 +194,82 @@ public Mono> createQueueWithResponse(String queueName,
return withContext(context -> createQueueWithResponse(queueName, queueOptions, context));
}
+ /**
+ * Creates a rule under the given topic and subscription
+ *
+ * @param topicName Name of the topic associated with rule.
+ * @param subscriptionName Name of the subscription associated with the rule.
+ * @param ruleName Name of the rule.
+ *
+ * @return A Mono that completes with information about the created rule.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
+ * processing the request.
+ * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
+ * @throws NullPointerException if {@code topicName} or {@code ruleName} are are null.
+ * @throws ResourceExistsException if a rule exists with the same topic, subscription, and rule name.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono createRule(String topicName, String subscriptionName, String ruleName) {
+ try {
+ return createRule(topicName, subscriptionName, ruleName, new CreateRuleOptions());
+ } catch (RuntimeException e) {
+ return monoError(logger, e);
+ }
+ }
+
+ /**
+ * Creates a rule with the {@link CreateRuleOptions}.
+ *
+ * @param topicName Name of the topic associated with rule.
+ * @param subscriptionName Name of the subscription associated with the rule.
+ * @param ruleName Name of the rule.
+ * @param ruleOptions Information about the rule to create.
+ *
+ * @return A Mono that completes with information about the created rule.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
+ * processing the request.
+ * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
+ * @throws NullPointerException if {@code topicName}, {@code ruleName}, or {@code ruleOptions}
+ * are are null.
+ * @throws ResourceExistsException if a rule exists with the same topic and rule name.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono createRule(String topicName, String subscriptionName, String ruleName,
+ CreateRuleOptions ruleOptions) {
+
+ return createRuleWithResponse(topicName, subscriptionName, ruleName, ruleOptions)
+ .map(Response::getValue);
+ }
+
+ /**
+ * Creates a rule and returns the created rule in addition to the HTTP response.
+ *
+ * @param topicName Name of the topic associated with rule.
+ * @param subscriptionName Name of the subscription associated with the rule.
+ * @param ruleName Name of the rule.
+ * @param ruleOptions Information about the rule to create.
+ *
+ * @return A Mono that returns the created rule in addition to the HTTP response.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
+ * processing the request.
+ * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
+ * @throws NullPointerException if {@code topicName}, {@code ruleName}, or {@code ruleOptions}
+ * are are null.
+ * @throws ResourceExistsException if a rule exists with the same topic and rule name.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono> createRuleWithResponse(String topicName, String subscriptionName,
+ String ruleName, CreateRuleOptions ruleOptions) {
+ return withContext(context -> createRuleWithResponse(topicName, subscriptionName, ruleName, ruleOptions,
+ context));
+ }
+
/**
* Creates a subscription with the given topic and subscription names.
*
@@ -208,7 +296,7 @@ public Mono createSubscription(String topicName, String
}
/**
- * Creates a subscription with the {@link SubscriptionProperties}.
+ * Creates a subscription with the {@link CreateSubscriptionOptions}.
*
* @param topicName Name of the topic associated with subscription.
* @param subscriptionName Name of the subscription.
@@ -228,17 +316,19 @@ public Mono createSubscription(String topicName, String
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono createSubscription(String topicName, String subscriptionName,
CreateSubscriptionOptions subscriptionOptions) {
- return createSubscriptionWithResponse(topicName, subscriptionName, subscriptionOptions).map(Response::getValue);
+
+ return createSubscriptionWithResponse(topicName, subscriptionName, subscriptionOptions)
+ .map(Response::getValue);
}
/**
- * Creates a queue and returns the created queue in addition to the HTTP response.
+ * Creates a subscription and returns the created subscription in addition to the HTTP response.
*
* @param topicName Name of the topic associated with subscription.
* @param subscriptionName Name of the subscription.
* @param subscriptionOptions Information about the subscription to create.
*
- * @return A Mono that returns the created queue in addition to the HTTP response.
+ * @return A Mono that returns the created subscription in addition to the HTTP response.
* @throws ClientAuthenticationException if the client's credentials do not have access to modify the
* namespace.
* @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
@@ -361,6 +451,48 @@ public Mono> deleteQueueWithResponse(String queueName) {
return withContext(context -> deleteQueueWithResponse(queueName, context));
}
+ /**
+ * Deletes a rule the matching {@code ruleName}.
+ *
+ * @param topicName Name of topic associated with rule to delete.
+ * @param subscriptionName Name of the subscription associated with the rule to delete.
+ * @param ruleName Name of rule to delete.
+ *
+ * @return A Mono that completes when the rule is deleted.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If error occurred processing the request.
+ * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} is an empty string.
+ * @throws NullPointerException if {@code topicName} or {@code ruleName} is null.
+ * @throws ResourceNotFoundException if the {@code ruleName} does not exist.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono deleteRule(String topicName, String subscriptionName, String ruleName) {
+ return deleteRuleWithResponse(topicName, subscriptionName, ruleName).then();
+ }
+
+ /**
+ * Deletes a rule the matching {@code ruleName} and returns the HTTP response.
+ *
+ * @param topicName Name of topic associated with rule to delete.
+ * @param subscriptionName Name of the subscription associated with the rule to delete.
+ * @param ruleName Name of rule to delete.
+ *
+ * @return A Mono that completes when the rule is deleted and returns the HTTP response.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If error occurred processing the request.
+ * @throws IllegalArgumentException if {@code topicName}, {@code subscriptionName}, or {@code ruleName} is an
+ * empty string.
+ * @throws NullPointerException if {@code topicName}, {@code subscriptionName}, or {@code ruleName} is null.
+ * @throws ResourceNotFoundException if the {@code ruleName} does not exist.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono> deleteRuleWithResponse(String topicName, String subscriptionName,
+ String ruleName) {
+ return withContext(context -> deleteRuleWithResponse(topicName, subscriptionName, ruleName, context));
+ }
+
/**
* Deletes a subscription the matching {@code subscriptionName}.
*
@@ -574,6 +706,41 @@ public Mono> getNamespacePropertiesWithResponse()
return withContext(this::getNamespacePropertiesWithResponse);
}
+ /**
+ * Gets a rule from the service namespace.
+ *
+ * Only following data types are deserialized in Filters and Action parameters - string, int, long, boolean, double,
+ * and OffsetDateTime. Other data types would return its string value.
+ *
+ * @param topicName The name of the topic relative to service bus namespace.
+ * @param subscriptionName The subscription name the rule belongs to.
+ * @param ruleName The name of the rule to retrieve.
+ *
+ * @return The associated rule.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono getRule(String topicName, String subscriptionName, String ruleName) {
+ return getRuleWithResponse(topicName, subscriptionName, ruleName).map(response -> response.getValue());
+ }
+
+ /**
+ * Gets a rule from the service namespace.
+ *
+ * Only following data types are deserialized in Filters and Action parameters - string, int, long, bool, double,
+ * and OffsetDateTime. Other data types would return its string value.
+ *
+ * @param topicName The name of the topic relative to service bus namespace.
+ * @param subscriptionName The subscription name the rule belongs to.
+ * @param ruleName The name of the rule to retrieve.
+ *
+ * @return The associated rule with the corresponding HTTP response.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono> getRuleWithResponse(String topicName, String subscriptionName,
+ String ruleName) {
+ return withContext(context -> getRuleWithResponse(topicName, subscriptionName, ruleName, context));
+ }
+
/**
* Gets information about the queue.
*
@@ -822,6 +989,33 @@ public PagedFlux listQueues() {
token -> withContext(context -> listQueuesNextPage(token, context)));
}
+ /**
+ * Fetches all the rules for a topic and subscription.
+ *
+ * @param topicName The topic name under which all the rules need to be retrieved.
+ * @param subscriptionName The name of the subscription for which all rules need to be retrieved.
+ *
+ * @return A Flux of {@link RuleProperties rules} for the {@code topicName} and {@code subscriptionName}.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws NullPointerException if {@code topicName} or {@code subscriptionName} is null.
+ * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} is an empty string.
+ * @see List entities, rules, or
+ * authorization rules
+ */
+ @ServiceMethod(returns = ReturnType.COLLECTION)
+ public PagedFlux listRules(String topicName, String subscriptionName) {
+ if (topicName == null) {
+ return pagedFluxError(logger, new NullPointerException("'topicName' cannot be null."));
+ } else if (topicName.isEmpty()) {
+ return pagedFluxError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
+ }
+
+ return new PagedFlux<>(
+ () -> withContext(context -> listRulesFirstPage(topicName, subscriptionName, context)),
+ token -> withContext(context -> listRulesNextPage(topicName, subscriptionName, token, context)));
+ }
+
/**
* Fetches all the subscriptions for a topic.
*
@@ -940,6 +1134,67 @@ public Mono> updateQueueWithResponse(QueueProperties q
return withContext(context -> updateQueueWithResponse(queue, context));
}
+ /**
+ * Updates a rule with the given {@link RuleProperties}. The {@link RuleProperties} must be fully populated as all
+ * of the properties are replaced. If a property is not set the service default value is used.
+ *
+ * The suggested flow is:
+ *
+ *
{@link #getRule(String, String, String) Get rule description.}
+ *
Update the required elements.
+ *
Pass the updated description into this method.
+ *
+ *
+ * @param topicName The topic name under which the rule is updated.
+ * @param subscriptionName The name of the subscription for which the rule is updated.
+ * @param rule Information about the rule to update. You must provide all the property values that are desired
+ * on the updated entity. Any values not provided are set to the service default values.
+ *
+ * @return A Mono that returns the updated rule.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If the request body was invalid, the rule quota is exceeded, or an error
+ * occurred processing the request.
+ * @throws IllegalArgumentException if {@link RuleProperties#getName()} is null or an empty string.
+ * @throws NullPointerException if {@code rule} is null.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono updateRule(String topicName, String subscriptionName, RuleProperties rule) {
+ return updateRuleWithResponse(topicName, subscriptionName, rule).map(Response::getValue);
+ }
+
+ /**
+ * Updates a rule with the given {@link RuleProperties}. The {@link RuleProperties} must be fully populated as all
+ * of the properties are replaced. If a property is not set the service default value is used.
+ *
+ * The suggested flow is:
+ *
+ *
{@link #getRule(String, String, String) Get rule description.}
+ *
Update the required elements.
+ *
Pass the updated description into this method.
+ *
+ *
+ * @param topicName The topic name under which the rule is updated.
+ * @param subscriptionName The name of the subscription for which the rule is updated.
+ * @param rule Information about the rule to update. You must provide all the property values that are desired
+ * on the updated entity. Any values not provided are set to the service default values.
+ *
+ * @return A Mono that returns the updated rule in addition to the HTTP response.
+ * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
+ * namespace.
+ * @throws HttpResponseException If the request body was invalid, the rule quota is exceeded, or an error
+ * occurred processing the request.
+ * @throws IllegalArgumentException if {@link RuleProperties#getName()} is null or an empty string.
+ * @throws NullPointerException if {@code rule} is null.
+ * @see Create or Update Entity
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public Mono> updateRuleWithResponse(String topicName, String subscriptionName,
+ RuleProperties rule) {
+
+ return withContext(context -> updateRuleWithResponse(topicName, subscriptionName, rule, context));
+ }
+
/**
* Updates a subscription with the given {@link SubscriptionProperties}. The {@link SubscriptionProperties} must be
* fully populated as all of the properties are replaced. If a property is not set the service default value is
@@ -1134,17 +1389,76 @@ Mono> createQueueWithResponse(String queueName, Create
}
}
+ /**
+ * Creates a rule with its context.
+ *
+ * @param ruleOptions Rule to create.
+ * @param context Context to pass into request.
+ *
+ * @return A Mono that completes with the created {@link RuleProperties}.
+ */
+ Mono> createRuleWithResponse(String topicName, String subscriptionName, String ruleName,
+ CreateRuleOptions ruleOptions, Context context) {
+ if (topicName == null) {
+ return monoError(logger, new NullPointerException("'topicName' cannot be null."));
+ } else if (topicName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'topicName' cannot be empty."));
+ }
+
+ if (subscriptionName == null) {
+ return monoError(logger, new NullPointerException("'subscriptionName' cannot be null."));
+ } else if (subscriptionName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be empty."));
+ }
+
+ if (ruleName == null) {
+ return monoError(logger, new NullPointerException("'ruleName' cannot be null."));
+ } else if (ruleName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'ruleName' cannot be empty."));
+ }
+
+ if (ruleOptions == null) {
+ return monoError(logger, new NullPointerException("'rule' cannot be null."));
+ }
+
+ final RuleActionImpl action = ruleOptions.getAction() != null
+ ? EntityHelper.toImplementation(ruleOptions.getAction())
+ : null;
+ final RuleFilterImpl filter = ruleOptions.getFilter() != null
+ ? EntityHelper.toImplementation(ruleOptions.getFilter())
+ : null;
+ final RuleDescription rule = new RuleDescription()
+ .setAction(action)
+ .setFilter(filter)
+ .setName(ruleName);
+
+ final CreateRuleBodyContent content = new CreateRuleBodyContent()
+ .setType(CONTENT_TYPE)
+ .setRuleDescription(rule);
+ final CreateRuleBody createEntity = new CreateRuleBody().setContent(content);
+
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+
+ try {
+ return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, ruleName, createEntity,
+ null, withTracing)
+ .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
+ .map(response -> deserializeRule(response));
+ } catch (RuntimeException ex) {
+ return monoError(logger, ex);
+ }
+ }
+
/**
* Creates a subscription with its context.
*
- * @param options Subscription to create.
+ * @param subscriptionOptions Subscription to create.
* @param context Context to pass into request.
*
* @return A Mono that completes with the created {@link SubscriptionProperties}.
*/
Mono> createSubscriptionWithResponse(String topicName, String subscriptionName,
- CreateSubscriptionOptions options,
- Context context) {
+ CreateSubscriptionOptions subscriptionOptions, Context context) {
if (topicName == null) {
return monoError(logger, new NullPointerException("'topicName' cannot be null."));
} else if (topicName.isEmpty()) {
@@ -1157,11 +1471,11 @@ Mono> createSubscriptionWithResponse(String top
return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be empty."));
}
- if (options == null) {
+ if (subscriptionOptions == null) {
return monoError(logger, new NullPointerException("'subscription' cannot be null."));
}
- final SubscriptionDescription subscription = EntityHelper.getSubscriptionDescription(options);
+ final SubscriptionDescription subscription = EntityHelper.getSubscriptionDescription(subscriptionOptions);
final CreateSubscriptionBodyContent content = new CreateSubscriptionBodyContent()
.setType(CONTENT_TYPE)
.setSubscriptionDescription(subscription);
@@ -1225,7 +1539,7 @@ Mono> createTopicWithResponse(String topicName, Create
* @param queueName Name of queue to delete.
* @param context Context to pass into request.
*
- * @return A Mono that completes with the created {@link QueueProperties}.
+ * @return A Mono that completes when the queue is deleted.
*/
Mono> deleteQueueWithResponse(String queueName, Context context) {
if (queueName == null) {
@@ -1250,6 +1564,46 @@ Mono> deleteQueueWithResponse(String queueName, Context context)
}
}
+ /**
+ * Deletes a queue with its context.
+ *
+ * @param topicName Name of topic to delete.
+ * @param subscriptionName Name of the subscription for the rule.
+ * @param ruleName Name of the rule.
+ * @param context Context to pass into request.
+ *
+ * @return A Mono that completes with the created {@link QueueProperties}.
+ */
+ Mono> deleteRuleWithResponse(String topicName, String subscriptionName, String ruleName,
+ Context context) {
+ if (topicName == null) {
+ return monoError(logger, new NullPointerException("'topicName' cannot be null"));
+ } else if (topicName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
+ } else if (subscriptionName == null) {
+ return monoError(logger, new NullPointerException("'subscriptionName' cannot be null"));
+ } else if (subscriptionName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be an empty string."));
+ } else if (ruleName == null) {
+ return monoError(logger, new NullPointerException("'ruleName' cannot be null"));
+ } else if (ruleName.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'ruleName' cannot be an empty string."));
+ } else if (context == null) {
+ return monoError(logger, new NullPointerException("'context' cannot be null."));
+ }
+
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+
+ try {
+ return rulesClient.deleteWithResponseAsync(topicName, subscriptionName, ruleName, withTracing)
+ .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
+ .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
+ response.getHeaders(), null));
+ } catch (RuntimeException ex) {
+ return monoError(logger, ex);
+ }
+ }
+
/**
* Deletes a subscription with its context.
*
@@ -1381,6 +1735,19 @@ Mono> getQueueWithResponse(String queueName, Context context,
}
}
+ Mono> getRuleWithResponse(String topicName, String subscriptionName,
+ String ruleName, Context context) {
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+
+ try {
+ return rulesClient.getWithResponseAsync(topicName, subscriptionName, ruleName, true, withTracing)
+ .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
+ .map(this::deserializeRule);
+ } catch (RuntimeException ex) {
+ return monoError(logger, ex);
+ }
+ }
+
/**
* Gets a subscription with its context.
*
@@ -1536,6 +1903,47 @@ Mono> listQueuesNextPage(String continuationToken
}
}
+ /**
+ * Gets the first page of rules with context.
+ *
+ * @param context Context to pass into request.
+ *
+ * @return A Mono that completes with a page of rules.
+ */
+ Mono> listRulesFirstPage(String topicName, String subscriptionName, Context context) {
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+
+ try {
+ return listRules(topicName, subscriptionName, 0, withTracing);
+ } catch (RuntimeException e) {
+ return monoError(logger, e);
+ }
+ }
+
+ /**
+ * Gets the next page of rules with context.
+ *
+ * @param continuationToken Number of items to skip in feed.
+ * @param context Context to pass into request.
+ *
+ * @return A Mono that completes with a page of rules or empty if there are no items left.
+ */
+ Mono> listRulesNextPage(String topicName, String subscriptionName,
+ String continuationToken, Context context) {
+ if (continuationToken == null || continuationToken.isEmpty()) {
+ return Mono.empty();
+ }
+
+ try {
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+ final int skip = Integer.parseInt(continuationToken);
+
+ return listRules(topicName, subscriptionName, skip, withTracing);
+ } catch (RuntimeException e) {
+ return monoError(logger, e);
+ }
+ }
+
/**
* Gets the first page of subscriptions with context.
*
@@ -1651,6 +2059,42 @@ Mono> updateQueueWithResponse(QueueProperties queue, C
}
}
+ /**
+ * Updates a rule with its context.
+ *
+ * @param rule Information about the rule to update. You must provide all the property values that are desired
+ * on the updated entity. Any values not provided are set to the service default values.
+ * @param context Context to pass into request.
+ *
+ * @return A Mono that completes with the updated {@link RuleProperties}.
+ */
+ Mono> updateRuleWithResponse(String topicName, String subscriptionName,
+ RuleProperties rule, Context context) {
+ if (rule == null) {
+ return monoError(logger, new NullPointerException("'rule' cannot be null"));
+ } else if (context == null) {
+ return monoError(logger, new NullPointerException("'context' cannot be null."));
+ }
+
+ final RuleDescription implementation = EntityHelper.toImplementation(rule);
+ final CreateRuleBodyContent content = new CreateRuleBodyContent()
+ .setType(CONTENT_TYPE)
+ .setRuleDescription(implementation);
+ final CreateRuleBody ruleBody = new CreateRuleBody()
+ .setContent(content);
+ final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, SERVICE_BUS_TRACING_NAMESPACE_VALUE);
+
+ try {
+ // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour.
+ return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, rule.getName(),
+ ruleBody, "*", withTracing)
+ .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
+ .map(response -> deserializeRule(response));
+ } catch (RuntimeException ex) {
+ return monoError(logger, ex);
+ }
+ }
+
/**
* Updates a subscription with its context.
*
@@ -1783,6 +2227,30 @@ private Response deserializeQueue(Response