Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Iterator pull methods, add javadoc and tests #1041

Merged
merged 1 commit into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,56 @@ interface MessageConsumer extends AutoCloseable {
*/
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

/**
* Pulls messages from the provided subscription. This method possibly returns no messages if no
* message was available at the time the request was processed by the Pub/Sub service (i.e. the
* system is not allowed to wait until at least one message is available). Pulled messages have
* their acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
*
* <p>Example usage of synchronous message pulling:
* <pre> {@code
* Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param subscription the subscription from which to pull messages
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
*
* <p>Example usage of asynchronous message pulling:
* <pre> {@code
* Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
* // do something while the request gets processed
* Iterator<ReceivedMessage> messageIterator = future.get();
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param subscription the subscription from which to pull messages
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;

This comment was marked as spam.

import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand All @@ -29,10 +29,12 @@
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -52,6 +54,8 @@
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -64,6 +68,8 @@
class PubSubImpl extends BaseService<PubSubOptions> implements PubSub {

private final PubSubRpc rpc;
private final AckDeadlineRenewer ackDeadlineRenewer;
private boolean closed;

private static final Function<Empty, Void> EMPTY_TO_VOID_FUNCTION = new Function<Empty, Void>() {
@Override
Expand All @@ -78,10 +84,25 @@ public Boolean apply(Empty input) {
return input != null;
}
};
private static final Function<com.google.pubsub.v1.ReceivedMessage, String>
MESSAGE_TO_ACK_ID_FUNCTION = new Function<com.google.pubsub.v1.ReceivedMessage, String>() {
@Override
public String apply(com.google.pubsub.v1.ReceivedMessage message) {
return message.getAckId();
}
};

PubSubImpl(PubSubOptions options) {
super(options);
rpc = options.rpc();
ackDeadlineRenewer = new AckDeadlineRenewer(this);
}

@VisibleForTesting
PubSubImpl(PubSubOptions options, AckDeadlineRenewer ackDeadlineRenewer) {
super(options);
rpc = options.rpc();
this.ackDeadlineRenewer = ackDeadlineRenewer;
}

private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {
Expand Down Expand Up @@ -445,17 +466,35 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
// this should set return_immediately to true
return null;
return get(pullAsync(subscription, maxMessages));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
// messages
return null;
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.build();
Future<PullResponse> response = rpc.pull(request);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return lazyTransform(response, new Function<PullResponse, Iterator<ReceivedMessage>>() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse pullResponse) {
// Add all received messages to the automatic ack deadline renewer
List<String> ackIds = Lists.transform(pullResponse.getReceivedMessagesList(),
MESSAGE_TO_ACK_ID_FUNCTION);
ackDeadlineRenewer.add(subscription, ackIds);
return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(),
new Function<com.google.pubsub.v1.ReceivedMessage, ReceivedMessage>() {
@Override
public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) {
// Remove consumed message from automatic ack deadline renewer
ackDeadlineRenewer.remove(subscription, receivedMessage.getAckId());
return ReceivedMessage.fromPb(PubSubImpl.this, subscription, receivedMessage);
}
});
}
});
}

@Override
Expand Down Expand Up @@ -549,6 +588,13 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti

@Override
public void close() throws Exception {
if (closed) {
return;
}
closed = true;
rpc.close();
if (ackDeadlineRenewer != null) {
ackDeadlineRenewer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class DefaultPubSubRpc implements PubSubRpc {
private final ScheduledExecutorService executor;
private final ExecutorFactory executorFactory;

private boolean closed;

private static final class InternalPubSubOptions extends PubSubOptions {

private static final long serialVersionUID = -7997372049256706185L;
Expand Down Expand Up @@ -233,6 +235,10 @@ public Future<Empty> modify(ModifyPushConfigRequest request) {

@Override
public void close() throws Exception {
if (closed) {
return;
}
closed = true;
subscriberApi.close();
publisherApi.close();
executorFactory.release(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand All @@ -47,6 +49,9 @@ public class AckDeadlineRenewerTest {
private PubSub pubsub;
private AckDeadlineRenewer ackDeadlineRenewer;

@Rule
public Timeout globalTimeout = Timeout.seconds(60);

@Before
public void setUp() {
pubsub = EasyMock.createStrictMock(PubSub.class);
Expand Down
Loading