Skip to content

Commit

Permalink
Implement Iterator pull methods, add javadoc and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 3, 2016
1 parent e3a6d2e commit a881209
Show file tree
Hide file tree
Showing 6 changed files with 591 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,56 @@ interface MessageConsumer extends AutoCloseable {
*/
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

/**
* Pulls messages from the provided subscription. This method possibly returns no messages if no
* message was available at the time the request was processed by the Pub/Sub service (i.e. the
* system is not allowed to wait until at least one message is available). Pulled messages have
* their acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
*
* <p>Example usage of synchronous message pulling:
* <pre> {@code
* Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param subscription the subscription from which to pull messages
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
*
* <p>Example usage of asynchronous message pulling:
* <pre> {@code
* Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
* // do something while the request gets processed
* Iterator<ReceivedMessage> messageIterator = future.get();
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param subscription the subscription from which to pull messages
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand All @@ -29,10 +29,12 @@
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -52,6 +54,8 @@
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -64,6 +68,8 @@
class PubSubImpl extends BaseService<PubSubOptions> implements PubSub {

private final PubSubRpc rpc;
private final AckDeadlineRenewer ackDeadlineRenewer;
private boolean closed;

private static final Function<Empty, Void> EMPTY_TO_VOID_FUNCTION = new Function<Empty, Void>() {
@Override
Expand All @@ -78,10 +84,25 @@ public Boolean apply(Empty input) {
return input != null;
}
};
private static final Function<com.google.pubsub.v1.ReceivedMessage, String>
MESSAGE_TO_ACK_ID_FUNCTION = new Function<com.google.pubsub.v1.ReceivedMessage, String>() {
@Override
public String apply(com.google.pubsub.v1.ReceivedMessage message) {
return message.getAckId();
}
};

PubSubImpl(PubSubOptions options) {
super(options);
rpc = options.rpc();
ackDeadlineRenewer = new AckDeadlineRenewer(this);
}

@VisibleForTesting
PubSubImpl(PubSubOptions options, AckDeadlineRenewer ackDeadlineRenewer) {
super(options);
rpc = options.rpc();
this.ackDeadlineRenewer = ackDeadlineRenewer;
}

private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {
Expand Down Expand Up @@ -445,17 +466,35 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
// this should set return_immediately to true
return null;
return get(pullAsync(subscription, maxMessages));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
// messages
return null;
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.build();
Future<PullResponse> response = rpc.pull(request);
return lazyTransform(response, new Function<PullResponse, Iterator<ReceivedMessage>>() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse pullResponse) {
// Add all received messages to the automatic ack deadline renewer
List<String> ackIds = Lists.transform(pullResponse.getReceivedMessagesList(),
MESSAGE_TO_ACK_ID_FUNCTION);
ackDeadlineRenewer.add(subscription, ackIds);
return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(),
new Function<com.google.pubsub.v1.ReceivedMessage, ReceivedMessage>() {
@Override
public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) {
// Remove consumed message from automatic ack deadline renewer
ackDeadlineRenewer.remove(subscription, receivedMessage.getAckId());
return ReceivedMessage.fromPb(PubSubImpl.this, subscription, receivedMessage);
}
});
}
});
}

@Override
Expand Down Expand Up @@ -549,6 +588,13 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti

@Override
public void close() throws Exception {
if (closed) {
return;
}
closed = true;
rpc.close();
if (ackDeadlineRenewer != null) {
ackDeadlineRenewer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class DefaultPubSubRpc implements PubSubRpc {
private final ScheduledExecutorService executor;
private final ExecutorFactory executorFactory;

private boolean closed;

private static final class InternalPubSubOptions extends PubSubOptions {

private static final long serialVersionUID = -7997372049256706185L;
Expand Down Expand Up @@ -233,6 +235,10 @@ public Future<Empty> modify(ModifyPushConfigRequest request) {

@Override
public void close() throws Exception {
if (closed) {
return;
}
closed = true;
subscriberApi.close();
publisherApi.close();
executorFactory.release(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand All @@ -47,6 +49,9 @@ public class AckDeadlineRenewerTest {
private PubSub pubsub;
private AckDeadlineRenewer ackDeadlineRenewer;

@Rule
public Timeout globalTimeout = Timeout.seconds(60);

@Before
public void setUp() {
pubsub = EasyMock.createStrictMock(PubSub.class);
Expand Down
Loading

0 comments on commit a881209

Please sign in to comment.