Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NUTCH-2573 Suspend crawling if robots.txt fails to fetch with 5xx status #724

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions conf/nutch-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,32 @@
then such sites will be treated as forbidden.</description>
</property>

<property>
<name>http.robots.503.defer.visits</name>
<value>true</value>
<description>Temporarily suspend fetching from a host if the
robots.txt response is HTTP 503 or any other 5xx server error. See
also http.robots.503.defer.visits.delay and
http.robots.503.defer.visits.retries</description>
</property>

<property>
<name>http.robots.503.defer.visits.delay</name>
<value>300000</value>
<description>Time in milliseconds to suspend crawling a host if the
robots.txt response is HTTP 5xx - see
http.robots.503.defer.visits.</description>
</property>

<property>
<name>http.robots.503.defer.visits.retries</name>
<value>3</value>
<description>Number of retries crawling a host if the robots.txt
response is HTTP 5xx - see http.robots.503.defer.visits. After n
retries the host queue is dropped for this segment/cycle.
</description>
</property>

<property>
<name>http.agent.description</name>
<value></value>
Expand Down
64 changes: 51 additions & 13 deletions src/java/org/apache/nutch/fetcher/FetchItemQueues.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,19 @@ public synchronized FetchItem getFetchItem() {
return null;
}

/**
* @return true if the fetcher timelimit is defined and has been exceeded
* ({@code fetcher.timelimit.mins} minutes after fetching started)
*/
public boolean timelimitExceeded() {
return timelimit != -1 && System.currentTimeMillis() >= timelimit;
}

// called only once the feeder has stopped
public synchronized int checkTimelimit() {
int count = 0;

if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
if (timelimitExceeded()) {
// emptying the queues
count = emptyQueues();

Expand All @@ -209,6 +217,7 @@ public synchronized int checkTimelimit() {
if (totalSize.get() != 0 && queues.size() == 0)
totalSize.set(0);
}

return count;
}

Expand All @@ -220,11 +229,9 @@ public synchronized int emptyQueues() {
FetchItemQueue fiq = queues.get(id);
if (fiq.getQueueSize() == 0)
continue;
LOG.info("* queue: " + id + " >> dropping! ");
LOG.info("* queue: {} >> dropping!", id);
int deleted = fiq.emptyQueue();
for (int i = 0; i < deleted; i++) {
totalSize.decrementAndGet();
}
totalSize.addAndGet(-deleted);
count += deleted;
}

Expand All @@ -235,26 +242,43 @@ public synchronized int emptyQueues() {
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
* @param queueid a queue identifier to locate and check
* The next fetch is delayed if specified by the param {@code delay} or
* configured by the property {@code fetcher.exceptions.per.queue.delay}.
*
* @param queueid
* a queue identifier to locate and check
* @param maxExceptions
* custom-defined number of max. exceptions - if negative the value
* of the property {@code fetcher.max.exceptions.per.queue} is used.
* @param delay
* a custom-defined time span in milliseconds to delay the next fetch
* in addition to the delay defined for the given queue. If a
* negative value is passed the delay is chosen by
* {@code fetcher.exceptions.per.queue.delay}
*
* @return number of purged items
*/
public synchronized int checkExceptionThreshold(String queueid) {
public synchronized int checkExceptionThreshold(String queueid,
int maxExceptions, long delay) {
FetchItemQueue fiq = queues.get(queueid);
if (fiq == null) {
return 0;
}
int excCount = fiq.incrementExceptionCounter();
if (delay > 0) {
fiq.nextFetchTime.getAndAdd(delay);
LOG.info("* queue: {} >> delayed next fetch by {} ms", queueid, delay);
}
if (fiq.getQueueSize() == 0) {
return 0;
}
if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
if (maxExceptions!= -1 && excCount >= maxExceptions) {
// too many exceptions for items in this queue - purge it
int deleted = fiq.emptyQueue();
LOG.info("* queue: " + queueid + " >> removed " + deleted
+ " URLs from queue because " + excCount + " exceptions occurred");
for (int i = 0; i < deleted; i++) {
totalSize.decrementAndGet();
}
LOG.info(
"* queue: {} >> removed {} URLs from queue because {} exceptions occurred",
queueid, deleted, excCount);
totalSize.getAndAdd(-deleted);
// keep queue IDs to ensure that these queues aren't created and filled
// again, see addFetchItem(FetchItem)
queuesMaxExceptions.add(queueid);
Expand All @@ -263,6 +287,20 @@ public synchronized int checkExceptionThreshold(String queueid) {
return 0;
}

/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
* @see #checkExceptionThreshold(String, int, long)
*
* @param queueid
* queue identifier to locate and check
* @return number of purged items
*/
public int checkExceptionThreshold(String queueid) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Basic Javadoc?

return checkExceptionThreshold(queueid, this.maxExceptionsPerQueue, -1);
}

/**
* @param redirUrl
* redirect target
Expand Down
39 changes: 38 additions & 1 deletion src/java/org/apache/nutch/fetcher/FetcherThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public class FetcherThread extends Thread {
private AtomicLong bytes;

private List<Content> robotsTxtContent = null;
private long robotsDeferVisitsDelay;
private int robotsDeferVisitsRetries;

//Used by the REST service
private FetchNode fetchNode;
Expand Down Expand Up @@ -194,6 +196,14 @@ public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQ
URLNormalizers.SCOPE_OUTLINK);
}

// NUTCH-2573 defer visits if robots.txt fails with HTTP 5xx
if (conf.getBoolean("http.robots.503.defer.visits", true)) {
this.robotsDeferVisitsDelay = conf
.getLong("http.robots.503.defer.visits.delay", 5 * 60 * 1000L);
this.robotsDeferVisitsRetries = conf
.getInt("http.robots.503.defer.visits.retries", 3);
}

if((activatePublisher=conf.getBoolean("fetcher.publisher", false)))
this.publisher = new FetcherThreadPublisher(conf);

Expand Down Expand Up @@ -312,6 +322,25 @@ public void run() {
outputRobotsTxt(robotsTxtContent);
robotsTxtContent.clear();
}
if (rules.isDeferVisits()) {
LOG.info("Defer visits for queue {} : {}", fit.queueID, fit.url);
// retry the fetch item
if (fetchQueues.timelimitExceeded()) {
fetchQueues.finishFetchItem(fit, true);
} else {
fetchQueues.addFetchItem(fit);
}
// but check whether it's time to cancel the queue
int killedURLs = fetchQueues.checkExceptionThreshold(
fit.getQueueID(), this.robotsDeferVisitsRetries + 1,
this.robotsDeferVisitsDelay);
if (killedURLs != 0) {
context
.getCounter("FetcherStatus", "robots_defer_visits_dropped")
.increment(killedURLs);
}
continue;
}
if (!rules.isAllowed(fit.url.toString())) {
// unblock
fetchQueues.finishFetchItem(fit, true);
Expand Down Expand Up @@ -600,6 +629,12 @@ private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
LOG.debug(" - ignoring redirect from {} to {} as duplicate", fit.url,
redirUrl);
return null;
} else if (fetchQueues.timelimitExceeded()) {
redirecting = false;
context.getCounter("FetcherStatus", "hitByTimeLimit").increment(1);
lewismc marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug(" - ignoring redirect from {} to {} - timelimit reached",
fit.url, redirUrl);
return null;
}
CrawlDatum newDatum = createRedirDatum(redirUrl, fit, CrawlDatum.STATUS_DB_UNFETCHED);
fit = FetchItem.create(redirUrl, newDatum, queueMode);
Expand Down Expand Up @@ -780,8 +815,10 @@ private ParseStatus output(Text key, CrawlDatum datum, Content content,
reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, parseData.getContentMeta().get("content-language"));
publisher.publish(reportEvent, conf);
}

// Only process depth N outlinks
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth
&& !fetchQueues.timelimitExceeded()) {
FetchItem ft = FetchItem.create(url, null, queueMode);
FetchItemQueue queue = fetchQueues.getFetchItemQueue(ft.queueID);
queue.alreadyFetched.add(url.toString().hashCode());
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/nutch/protocol/RobotRulesParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public abstract class RobotRulesParser implements Tool {
public static BaseRobotRules FORBID_ALL_RULES = new SimpleRobotRules(
RobotRulesMode.ALLOW_NONE);

/**
* A {@link BaseRobotRules} object appropriate for use when the
* {@code robots.txt} file failed to fetch with a 503 &quot;Internal Server
* Error&quot; (or other 5xx) status code. The crawler should suspend crawling
* for a certain (but not too long) time, see property
* <code>http.robots.503.defer.visits</code>.
*/
public static final BaseRobotRules DEFER_VISIT_RULES = new SimpleRobotRules(
RobotRulesMode.ALLOW_NONE);
static {
DEFER_VISIT_RULES.setDeferVisits(true);
}

private static SimpleRobotRulesParser robotParser = new SimpleRobotRulesParser();
static {
robotParser.setMaxCrawlDelay(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public class HttpRobotRulesParser extends RobotRulesParser {

private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());

protected boolean allowForbidden = false;
protected boolean deferVisits503 = false;

HttpRobotRulesParser() {
}
Expand All @@ -53,6 +55,7 @@ public HttpRobotRulesParser(Configuration conf) {
public void setConf(Configuration conf) {
super.setConf(conf);
allowForbidden = conf.getBoolean("http.robots.403.allow", true);
deferVisits503 = conf.getBoolean("http.robots.503.defer.visits", true);
}

/**
Expand Down Expand Up @@ -110,7 +113,7 @@ public BaseRobotRules getRobotRulesSet(Protocol http, URL url,
if (robotRules != null) {
return robotRules; // cached rule
} else if (LOG.isTraceEnabled()) {
LOG.trace("cache miss " + url);
LOG.trace("cache miss {}", url);
}

boolean cacheRule = true;
Expand Down Expand Up @@ -163,9 +166,15 @@ else if ((response.getCode() == 403) && (!allowForbidden))
robotRules = FORBID_ALL_RULES; // use forbid all
else if (response.getCode() >= 500) {
cacheRule = false; // try again later to fetch robots.txt
robotRules = EMPTY_RULES;
} else
if (deferVisits503) {
// signal fetcher to suspend crawling for this host
robotRules = DEFER_VISIT_RULES;
} else {
robotRules = EMPTY_RULES;
}
} else {
robotRules = EMPTY_RULES; // use default rules
}
} catch (Throwable t) {
if (LOG.isInfoEnabled()) {
LOG.info("Couldn't get robots.txt for " + url + ": " + t.toString());
Expand Down