diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a55f6830a5f0f..82a0e5c85dd25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2124,30 +2124,34 @@ private void internalCreateSubscriptionForNonPartitionedTopic( asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic")); return; } - PersistentSubscription subscription = (PersistentSubscription) topic - .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get(); - // Mark the cursor as "inactive" as it was created without a real consumer connected - subscription.deactivateCursor(); - subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) - .thenRun(() -> { - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), - topicName, subscriptionName, targetMessageId); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); - log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId, t); - if (t instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for position specified: " + t.getMessage())); - } else if (t instanceof SubscriptionBusyException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Failed for Subscription Busy: " + t.getMessage())); - } else { - resumeAsyncResponseExceptionally(asyncResponse, t); - } - return null; - }); + topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated).thenApply(subscription -> { + // Mark the cursor as "inactive" as it was created without a real consumer connected + ((PersistentSubscription) subscription).deactivateCursor(); + subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) + .thenRun(() -> { + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), + topicName, subscriptionName, targetMessageId); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); + log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), + topicName, subscriptionName, targetMessageId, t); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage())); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Failed for Subscription Busy: " + t.getMessage())); + } else { + resumeAsyncResponseExceptionally(asyncResponse, t); + } + return null; + }); + return null; + }).exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); + return null; + }); } catch (Throwable e) { log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId, e);