diff --git a/google-cloud-pubsub/clirr-ignored-differences.xml b/google-cloud-pubsub/clirr-ignored-differences.xml new file mode 100644 index 000000000..610c1b362 --- /dev/null +++ b/google-cloud-pubsub/clirr-ignored-differences.xml @@ -0,0 +1,20 @@ + + + + + + 7002 + com/google/cloud/pubsub/v1/Subscriber$Builder + com.google.cloud.pubsub.v1.Subscriber$Builder setExactlyOnceDeliveryEnabled(boolean) + + + 7002 + com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder + com.google.cloud.pubsub.v1.StreamingSubscriberConnection$Builder setExactlyOnceDeliveryEnabled(boolean) + + + 7002 + com/google/cloud/pubsub/v1/MessageDispatcher$Builder + com.google.cloud.pubsub.v1.MessageDispatcher$Builder setEnableExactlyOnceDelivery(boolean) + + diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java index 199186004..ec2a27f3b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java @@ -18,8 +18,34 @@ import java.util.concurrent.Future; +/** + * Acknowledging a message in Pub/Sub means that you are done with it, and it will not be delivered + * to this subscription again. You should avoid acknowledging messages until you have *finished* + * processing them, so that in the event of a failure, you receive the message again. + * + *

If exactly-once delivery is enabled on the subscription, the future returned by the ack/nack + * methods track the state of acknowledgement operation by the server. If the future completes + * successfully, the message is guaranteed NOT to be re-delivered. Otherwise, the future will + * contain an exception with more details about the failure and the message may be re-delivered. + * + *

If exactly-once delivery is NOT enabled on the subscription, the future returns immediately + * with an AckResponse.SUCCESS. Because re-deliveries are possible, you should ensure that your + * processing code is idempotent, as you may receive any given message more than once. + */ public interface AckReplyConsumerWithResponse { + /** + * Acknowledges that the message has been successfully processed. The service will not send the + * message again. + * + *

A future representing the server response is returned + */ Future ack(); + /** + * Signals that the message has not been successfully processed. The service should resend the + * message. + * + *

A future representing the server response is returned + */ Future nack(); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index a9f73d5c3..31693a189 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -70,7 +70,7 @@ class MessageDispatcher { private final FlowController flowController; - private AtomicBoolean enableExactlyOnceDelivery; + private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false); private final Waiter messagesWaiter; @@ -198,7 +198,6 @@ private MessageDispatcher(Builder builder) { ackProcessor = builder.ackProcessor; flowController = builder.flowController; - enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery); ackLatencyDistribution = builder.ackLatencyDistribution; clock = builder.clock; jobLock = new ReentrantLock(); @@ -296,13 +295,13 @@ int getMessageDeadlineSeconds() { } @InternalApi - void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { - // Sanity check that we are changing the enableExactlyOnceDelivery state - if (enableExactlyOnceDelivery == this.enableExactlyOnceDelivery.get()) { + void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { + // Sanity check that we are changing the exactlyOnceDeliveryEnabled state + if (exactlyOnceDeliveryEnabled == this.exactlyOnceDeliveryEnabled.get()) { return; } - this.enableExactlyOnceDelivery.set(enableExactlyOnceDelivery); + this.exactlyOnceDeliveryEnabled.set(exactlyOnceDeliveryEnabled); // If a custom value for minDurationPerAckExtension, we should respect that if (!minDurationPerAckExtensionDefaultUsed) { @@ -313,7 +312,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { // maxDurationPerAckExtensionSeconds does not change int possibleNewMinAckDeadlineExtensionSeconds; - if (enableExactlyOnceDelivery) { + if (exactlyOnceDeliveryEnabled) { possibleNewMinAckDeadlineExtensionSeconds = Math.toIntExact( Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()); @@ -323,7 +322,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { } // If we are not using the default maxDurationAckExtension, check if the - // minAckDeadlineExtensionExactlyOnce needs to be bounded by the set max + // minAckDeadlineExtensionExactlyOnceDelivery needs to be bounded by the set max if (!maxDurationPerAckExtensionDefaultUsed && (possibleNewMinAckDeadlineExtensionSeconds > maxDurationPerAckExtensionSeconds)) { minDurationPerAckExtensionSeconds = maxDurationPerAckExtensionSeconds; @@ -580,7 +579,6 @@ public static final class Builder { private Distribution ackLatencyDistribution; private FlowController flowController; - private boolean enableExactlyOnceDelivery; private Executor executor; private ScheduledExecutorService systemExecutor; @@ -641,11 +639,6 @@ public Builder setFlowController(FlowController flowController) { return this; } - public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { - this.enableExactlyOnceDelivery = enableExactlyOnceDelivery; - return this; - } - public Builder setExecutor(Executor executor) { this.executor = executor; return this; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index d1af3a3e9..78f35efea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -103,12 +103,7 @@ private StreamingSubscriberConnection(Builder builder) { // We need to set the default stream ack deadline on the initial request, this will be // updated by modack requests in the message dispatcher if (builder.maxDurationPerAckExtensionDefaultUsed) { - // If the default is used, check if exactly once is enabled and set appropriately - if (builder.exactlyOnceDeliveryEnabled) { - inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT; - } else { - inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT; - } + inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT; } else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE) < 0) { // We will not be able to extend more than the default minimum @@ -123,7 +118,6 @@ private StreamingSubscriberConnection(Builder builder) { subscriberStub = builder.subscriberStub; channelAffinity = builder.channelAffinity; - exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled); MessageDispatcher.Builder messageDispatcherBuilder; if (builder.receiver != null) { @@ -143,7 +137,6 @@ private StreamingSubscriberConnection(Builder builder) { .setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed) .setAckLatencyDistribution(builder.ackLatencyDistribution) .setFlowController(builder.flowController) - .setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled) .setExecutor(builder.executor) .setSystemExecutor(builder.systemExecutor) .setApiClock(builder.clock) @@ -159,7 +152,7 @@ public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled( return this; } - public boolean isExactlyOnceDeliveryEnabled() { + public boolean getExactlyOnceDeliveryEnabled() { return exactlyOnceDeliveryEnabled.get(); } @@ -221,7 +214,7 @@ public void onResponse(StreamingPullResponse response) { response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled(); setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse); - messageDispatcher.setEnableExactlyOnceDelivery(exactlyOnceDeliveryEnabledResponse); + messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse); messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); // Only request more if we're not shutdown. @@ -370,7 +363,7 @@ public void setResponseOutstandingMessages(AckResponse ackResponse) { private void setFailureFutureOutstandingMessages(Throwable t) { AckResponse ackResponse; - if (isExactlyOnceDeliveryEnabled()) { + if (getExactlyOnceDeliveryEnabled()) { if (!(t instanceof ApiException)) { ackResponse = AckResponse.OTHER; } @@ -518,7 +511,7 @@ public void onFailure(Throwable t) { // Remove from our pending operations ackOperationsWaiter.incrementPendingCount(-1); - if (!isExactlyOnceDeliveryEnabled()) { + if (!getExactlyOnceDeliveryEnabled()) { Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); return; @@ -609,7 +602,6 @@ public static final class Builder { private int channelAffinity; private FlowController flowController; private FlowControlSettings flowControlSettings; - private boolean exactlyOnceDeliveryEnabled; private boolean useLegacyFlowControl; private ScheduledExecutorService executor; private ScheduledExecutorService systemExecutor; @@ -690,11 +682,6 @@ public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) { return this; } - public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { - this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled; - return this; - } - public Builder setExecutor(ScheduledExecutorService executor) { this.executor = executor; return this; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 4ee66b031..1bd74f3bb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -134,7 +134,6 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private SubscriberStub subscriberStub; private final SubscriberStubSettings subStubSettings; private final FlowController flowController; - private boolean exactlyOnceDeliveryEnabled = false; private final int numPullers; private final MessageReceiver receiver; @@ -166,8 +165,6 @@ private Subscriber(Builder builder) { .setLimitExceededBehavior(LimitExceededBehavior.Block) .build()); - exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled; - this.numPullers = builder.parallelPullCount; executorProvider = builder.executorProvider; @@ -385,7 +382,6 @@ private void startStreamingConnections() { .setExecutor(executor) .setSystemExecutor(alarmsExecutor) .setClock(clock) - .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled) .build(); streamingSubscriberConnections.add(streamingSubscriberConnection); @@ -479,8 +475,6 @@ public static final class Builder { private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS; - private boolean exactlyOnceDeliveryEnabled = false; - private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; private ExecutorProvider systemExecutorProvider = null; private TransportChannelProvider channelProvider = @@ -573,22 +567,6 @@ public Builder setUseLegacyFlowControl(boolean value) { return this; } - /** - * Enables/Disabled ExactlyOnceDelivery - * - *

Will update the minDurationPerAckExtension if a user-provided value is not set - */ - public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { - // If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if - // applicable - if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) { - this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY; - } - - this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled; - return this; - } - /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 7fcee6b2e..c72d52d3d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -209,7 +209,6 @@ public void receiveMessage( receiveQueue.offer(MessageAndConsumer.create(message, consumer)); } }) - .setExactlyOnceDeliveryEnabled(true) .build(); subscriber.addListener( new Subscriber.Listener() { @@ -282,7 +281,6 @@ public void receiveMessage( SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) .build()) - .setExactlyOnceDeliveryEnabled(false) .build(); subscriber.addListener( new Subscriber.Listener() { @@ -360,7 +358,6 @@ public void receiveMessage( SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) .build()) - .setExactlyOnceDeliveryEnabled(false) .build(); subscriber.addListener( new Subscriber.Listener() { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 3ff13acfc..0b48e0991 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -337,8 +337,7 @@ public void testExtension_GiveUp() throws Exception { } @Test - public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() { - // EnableExactlyOnceDelivery is turned off by default + public void testAckExtensionDefaultsExactlyOnceDeliveryDisabledThenEnabled() { MessageDispatcher messageDispatcher = MessageDispatcher.newBuilder(mock(MessageReceiver.class)) .setAckLatencyDistribution(mockAckLatencyDistribution) @@ -348,13 +347,17 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() { .setMaxDurationPerAckExtensionDefaultUsed(true) .build(); + // ExactlyOnceDeliveryEnabled is turned off by default + // We should be using the Subscriber set hard deadlines assertMinAndMaxAckDeadlines( messageDispatcher, Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); - messageDispatcher.setEnableExactlyOnceDelivery(true); + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setExactlyOnceDeliveryEnabled(true); // Should only change min deadline assertMinAndMaxAckDeadlines( @@ -365,11 +368,10 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() { } @Test - public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() { + public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() { MessageDispatcher messageDispatcher = MessageDispatcher.newBuilder(mock(MessageReceiver.class)) .setAckLatencyDistribution(mockAckLatencyDistribution) - .setEnableExactlyOnceDelivery(true) .setMinDurationPerAckExtension( Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY) .setMinDurationPerAckExtensionDefaultUsed(true) @@ -377,13 +379,19 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() { .setMaxDurationPerAckExtensionDefaultUsed(true) .build(); + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setExactlyOnceDeliveryEnabled(true); + assertMinAndMaxAckDeadlines( messageDispatcher, Math.toIntExact( Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()), Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); - messageDispatcher.setEnableExactlyOnceDelivery(false); + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setExactlyOnceDeliveryEnabled(false); // Should change min deadline assertMinAndMaxAckDeadlines( @@ -393,7 +401,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() { } @Test - public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() { + public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() { int customMinSeconds = 30; MessageDispatcher messageDispatcher = MessageDispatcher.newBuilder(mock(MessageReceiver.class)) @@ -404,12 +412,15 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() { .setMaxDurationPerAckExtensionDefaultUsed(true) .build(); + // ExactlyOnceDeliveryEnabled is turned off by default assertMinAndMaxAckDeadlines( messageDispatcher, customMinSeconds, Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); - messageDispatcher.setEnableExactlyOnceDelivery(true); + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setExactlyOnceDeliveryEnabled(true); // no changes should occur assertMinAndMaxAckDeadlines( @@ -419,7 +430,7 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() { } @Test - public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() { + public void testAckExtensionCustomMaxExactlyOnceDeliveryDisabledThenEnabled() { int customMaxSeconds = 30; MessageDispatcher messageDispatcher = MessageDispatcher.newBuilder(mock(MessageReceiver.class)) @@ -430,12 +441,15 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() { .setMaxDurationPerAckExtensionDefaultUsed(false) .build(); + // ExactlyOnceDeliveryEnabled is turned off by default assertMinAndMaxAckDeadlines( messageDispatcher, Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), customMaxSeconds); - messageDispatcher.setEnableExactlyOnceDelivery(true); + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setExactlyOnceDeliveryEnabled(true); // Because the customMaxSeconds is above the // DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY, we should use the customMaxSeconds diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 6ad951001..d8e1878dd 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -452,13 +452,18 @@ public void testSetFailureResponseOutstandingMessages() { private StreamingSubscriberConnection getStreamingSubscriberConnection( boolean exactlyOnceDeliveryEnabled) { - return getStreamingSubscriberReceiverFromBuilder( - StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)), - exactlyOnceDeliveryEnabled); + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnectionFromBuilder( + StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class))); + + // This would normally be set from the streaming pull response + streamingSubscriberConnection.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled); + + return streamingSubscriberConnection; } - private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder( - StreamingSubscriberConnection.Builder builder, boolean exactlyOnceDeliveryEnabled) { + private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilder( + StreamingSubscriberConnection.Builder builder) { return builder .setSubscription(MOCK_SUBSCRIPTION_NAME) .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) @@ -474,7 +479,6 @@ private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder( .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) - .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled) .build(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index ab7021bba..612c244fe 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -287,16 +287,6 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); subscriber.stopAsync().awaitTerminated(); - - // maxDurationPerAckExtension is unset with exactly once enabled - subscriber = - startSubscriber(getTestSubscriberBuilder(testReceiver).setExactlyOnceDeliveryEnabled(true)); - assertEquals( - expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); - assertEquals( - Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT.getSeconds()), - fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); - subscriber.stopAsync().awaitTerminated(); } @Test @@ -358,7 +348,6 @@ private Builder getTestSubscriberBuilder( .setCredentialsProvider(NoCredentialsProvider.create()) .setClock(fakeExecutor.getClock()) .setParallelPullCount(1) - .setExactlyOnceDeliveryEnabled(true) .setFlowControlSettings( FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()); }