Skip to content

Commit 41dac25

Browse files
committed
Merge pull request #128 from ukwa/amqp-url-receiver-changes
AMQPUrlReceiver changes to support RabbitMQ >= 3.3
2 parents 88c7f3c + e883977 commit 41dac25

File tree

1 file changed

+76
-25
lines changed

1 file changed

+76
-25
lines changed

contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java

+76-25
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,33 @@ public boolean isRunning() {
107107
return isRunning;
108108
}
109109

110+
/** Should be queues be marked as durable? */
111+
private boolean durable = false;
112+
public boolean isDurable() {
113+
return durable;
114+
}
115+
public void setDurable(boolean durable) {
116+
this.durable = durable;
117+
}
118+
119+
/** Should be queues be marked as auto-delete? */
120+
private boolean autoDelete = true;
121+
public boolean isAutoDelete() {
122+
return autoDelete;
123+
}
124+
public void setAutoDelete(boolean autoDelete) {
125+
this.autoDelete = autoDelete;
126+
}
127+
110128
private transient Lock lock = new ReentrantLock(true);
111129

130+
private boolean pauseConsumer = true;
131+
132+
private boolean isConsuming = false;
133+
112134
private class StarterRestarter extends Thread {
135+
private String consumerTag = null;
136+
113137
public StarterRestarter(String name) {
114138
super(name);
115139
}
@@ -119,35 +143,52 @@ public void run() {
119143
while (!Thread.interrupted()) {
120144
try {
121145
lock.lockInterruptibly();
146+
logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer);
122147
try {
123-
if (!isRunning) {
148+
if (!isConsuming && !pauseConsumer) {
124149
// start up again
125150
try {
126151
Consumer consumer = new UrlConsumer(channel());
127152
channel().exchangeDeclare(getExchange(), "direct", true);
128-
channel().queueDeclare(getQueueName(), false, false, true, null);
153+
channel().queueDeclare(getQueueName(), durable,
154+
false, autoDelete, null);
129155
channel().queueBind(getQueueName(), getExchange(), getQueueName());
130-
channel().basicConsume(getQueueName(), false, consumer);
131-
isRunning = true;
132-
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName());
156+
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
157+
isConsuming = true;
158+
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
159+
} catch (IOException e) {
160+
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e);
161+
}
162+
}
163+
164+
if (isConsuming && pauseConsumer) {
165+
try {
166+
if (consumerTag != null) {
167+
logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag);
168+
channel().basicCancel(consumerTag);
169+
consumerTag = null;
170+
isConsuming = false;
171+
logger.info("Cancelled URLConsumer.");
172+
}
133173
} catch (IOException e) {
134-
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 30 seconds)", e);
174+
logger.log(Level.SEVERE, "problem cancelling AMQP consumer (will try again after 10 seconds)", e);
135175
}
136176
}
137177

138-
Thread.sleep(30000);
178+
Thread.sleep(10 * 1000);
139179
} finally {
140180
lock.unlock();
141181
}
142182
} catch (InterruptedException e) {
183+
143184
return;
144185
}
145186
}
146187
}
147188
}
148189

149190
transient private StarterRestarter starterRestarter;
150-
191+
151192
@Override
152193
public void start() {
153194
lock.lock();
@@ -274,9 +315,14 @@ public void handleDelivery(String consumerTag, Envelope envelope,
274315
logger.log(Level.SEVERE,
275316
"problem creating CrawlURI from json received via AMQP "
276317
+ decodedBody, e);
318+
} catch (Exception e) {
319+
logger.log(Level.SEVERE,
320+
"Unanticipated problem creating CrawlURI from json received via AMQP "
321+
+ decodedBody, e);
277322
}
278323
} else {
279-
logger.warning("ignoring url with method other than GET - " + decodedBody);
324+
logger.info("ignoring url with method other than GET - "
325+
+ decodedBody);
280326
}
281327

282328
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
@@ -290,7 +336,7 @@ public void handleShutdownSignal(String consumerTag,
290336
} else {
291337
logger.info("amqp channel/connection shut down consumerTag=" + consumerTag);
292338
}
293-
isRunning = false;
339+
isConsuming = false;
294340
}
295341

296342
// {
@@ -349,7 +395,19 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
349395
curi.setSchedulingDirective(SchedulingConstants.HIGH);
350396
curi.setPrecedence(1);
351397

352-
//curi.setForceFetch(true);
398+
// optional forceFetch instruction:
399+
if (jo.has("forceFetch")) {
400+
boolean forceFetch = jo.getBoolean("forceFetch");
401+
logger.info("Setting forceFetch=" + forceFetch);
402+
curi.setForceFetch(forceFetch);
403+
}
404+
405+
// optional isSeed instruction:
406+
if (jo.has("isSeed")) {
407+
boolean isSeed = jo.getBoolean("isSeed");
408+
logger.info("Setting isSeed=" + isSeed);
409+
curi.setSeed(isSeed);
410+
}
353411

354412
curi.getAnnotations().add(A_RECEIVED_FROM_AMQP);
355413

@@ -361,22 +419,15 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
361419
public void onApplicationEvent(CrawlStateEvent event) {
362420
switch(event.getState()) {
363421
case PAUSING: case PAUSED:
364-
if (channel != null && channel.isOpen()) {
365-
try {
366-
channel.flow(false);
367-
} catch (IOException e) {
368-
logger.log(Level.WARNING, "failed to pause flow on amqp channel", e);
369-
}
370-
}
422+
logger.info("Requesting a pause of the URLConsumer...");
423+
this.pauseConsumer = true;
371424
break;
372425

373-
case RUNNING: case EMPTY: case PREPARING:
374-
if (channel != null && channel.isOpen()) {
375-
try {
376-
channel.flow(true);
377-
} catch (IOException e) {
378-
logger.log(Level.SEVERE, "failed to resume flow on amqp channel", e);
379-
}
426+
case RUNNING:
427+
logger.info("Requesting restart of the URLConsumer...");
428+
this.pauseConsumer = false;
429+
if (starterRestarter == null || !starterRestarter.isAlive()) {
430+
start();
380431
}
381432
break;
382433

0 commit comments

Comments
 (0)