Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQPUrlReceiver changes to support RabbitMQ >= 3.3 #128

Merged
merged 3 commits into from
Sep 24, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,33 @@ public boolean isRunning() {
return isRunning;
}

/** Should be queues be marked as durable? */
private boolean durable = false;
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}

/** Should be queues be marked as auto-delete? */
private boolean autoDelete = true;
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}

private transient Lock lock = new ReentrantLock(true);

private boolean pauseConsumer = true;

private boolean isConsuming = false;

private class StarterRestarter extends Thread {
private String consumerTag = null;

public StarterRestarter(String name) {
super(name);
}
Expand All @@ -119,35 +143,52 @@ public void run() {
while (!Thread.interrupted()) {
try {
lock.lockInterruptibly();
logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer);
try {
if (!isRunning) {
if (!isConsuming && !pauseConsumer) {
// start up again
try {
Consumer consumer = new UrlConsumer(channel());
channel().exchangeDeclare(getExchange(), "direct", true);
channel().queueDeclare(getQueueName(), false, false, true, null);
channel().queueDeclare(getQueueName(), durable,
false, autoDelete, null);
channel().queueBind(getQueueName(), getExchange(), getQueueName());
channel().basicConsume(getQueueName(), false, consumer);
isRunning = true;
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName());
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
isConsuming = true;
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e);
}
}

if (isConsuming && pauseConsumer) {
try {
if (consumerTag != null) {
logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag);
channel().basicCancel(consumerTag);
consumerTag = null;
isConsuming = false;
logger.info("Cancelled URLConsumer.");
}
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 30 seconds)", e);
logger.log(Level.SEVERE, "problem cancelling AMQP consumer (will try again after 10 seconds)", e);
}
}

Thread.sleep(30000);
Thread.sleep(10 * 1000);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {

return;
}
}
}
}

transient private StarterRestarter starterRestarter;

@Override
public void start() {
lock.lock();
Expand Down Expand Up @@ -274,9 +315,14 @@ public void handleDelivery(String consumerTag, Envelope envelope,
logger.log(Level.SEVERE,
"problem creating CrawlURI from json received via AMQP "
+ decodedBody, e);
} catch (Exception e) {
logger.log(Level.SEVERE,
"Unanticipated problem creating CrawlURI from json received via AMQP "
+ decodedBody, e);
}
} else {
logger.warning("ignoring url with method other than GET - " + decodedBody);
logger.info("ignoring url with method other than GET - "
+ decodedBody);
}

this.getChannel().basicAck(envelope.getDeliveryTag(), false);
Expand All @@ -290,7 +336,7 @@ public void handleShutdownSignal(String consumerTag,
} else {
logger.info("amqp channel/connection shut down consumerTag=" + consumerTag);
}
isRunning = false;
isConsuming = false;
}

// {
Expand Down Expand Up @@ -349,7 +395,19 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
curi.setSchedulingDirective(SchedulingConstants.HIGH);
curi.setPrecedence(1);

//curi.setForceFetch(true);
// optional forceFetch instruction:
if (jo.has("forceFetch")) {
boolean forceFetch = jo.getBoolean("forceFetch");
logger.info("Setting forceFetch=" + forceFetch);
curi.setForceFetch(forceFetch);
}

// optional isSeed instruction:
if (jo.has("isSeed")) {
boolean isSeed = jo.getBoolean("isSeed");
logger.info("Setting isSeed=" + isSeed);
curi.setSeed(isSeed);
}

curi.getAnnotations().add(A_RECEIVED_FROM_AMQP);

Expand All @@ -361,22 +419,15 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
public void onApplicationEvent(CrawlStateEvent event) {
switch(event.getState()) {
case PAUSING: case PAUSED:
if (channel != null && channel.isOpen()) {
try {
channel.flow(false);
} catch (IOException e) {
logger.log(Level.WARNING, "failed to pause flow on amqp channel", e);
}
}
logger.info("Requesting a pause of the URLConsumer...");
this.pauseConsumer = true;
break;

case RUNNING: case EMPTY: case PREPARING:
if (channel != null && channel.isOpen()) {
try {
channel.flow(true);
} catch (IOException e) {
logger.log(Level.SEVERE, "failed to resume flow on amqp channel", e);
}
case RUNNING:
logger.info("Requesting restart of the URLConsumer...");
this.pauseConsumer = false;
if (starterRestarter == null || !starterRestarter.isAlive()) {
start();
}
break;

Expand Down