Skip to content

Commit 466c10b

Browse files
authored
Merge pull request #416 from internetarchive/extractor-chrome-replay-responses
ExtractorChrome: reduce request duplication between browser and frontier
2 parents ab19efa + 014fb2d commit 466c10b

File tree

5 files changed

+243
-30
lines changed

5 files changed

+243
-30
lines changed

contrib/src/main/java/org/archive/modules/extractor/ExtractorChrome.java

+85-8
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
package org.archive.modules.extractor;
2121

22+
import org.apache.commons.io.IOUtils;
2223
import org.archive.crawler.event.CrawlURIDispositionEvent;
2324
import org.archive.crawler.framework.CrawlController;
2425
import org.archive.crawler.framework.Frontier;
2526
import org.archive.modules.CrawlURI;
26-
import org.archive.net.chrome.ChromeClient;
27-
import org.archive.net.chrome.ChromeProcess;
28-
import org.archive.net.chrome.ChromeRequest;
29-
import org.archive.net.chrome.ChromeWindow;
27+
import org.archive.modules.Processor;
28+
import org.archive.modules.ProcessorChain;
29+
import org.archive.net.chrome.*;
3030
import org.archive.spring.KeyedProperties;
3131
import org.archive.util.Recorder;
3232
import org.json.JSONArray;
@@ -36,7 +36,10 @@
3636
import java.io.IOException;
3737
import java.io.InputStream;
3838
import java.io.SequenceInputStream;
39+
import java.util.ArrayList;
3940
import java.util.Arrays;
41+
import java.util.List;
42+
import java.util.Map;
4043
import java.util.concurrent.ExecutionException;
4144
import java.util.concurrent.Semaphore;
4245
import java.util.concurrent.TimeUnit;
@@ -46,11 +49,11 @@
4649

4750
import static java.nio.charset.StandardCharsets.US_ASCII;
4851
import static java.util.Collections.enumeration;
49-
import static java.util.logging.Level.INFO;
50-
import static java.util.logging.Level.WARNING;
52+
import static java.util.logging.Level.*;
5153
import static java.util.regex.Pattern.CASE_INSENSITIVE;
5254
import static org.archive.crawler.event.CrawlURIDispositionEvent.Disposition.FAILED;
5355
import static org.archive.crawler.event.CrawlURIDispositionEvent.Disposition.SUCCEEDED;
56+
import static org.archive.modules.CoreAttributeConstants.A_HTTP_RESPONSE_HEADERS;
5457
import static org.archive.modules.CrawlURI.FetchType.*;
5558

5659
/**
@@ -115,12 +118,19 @@ public class ExtractorChrome extends ContentExtractor {
115118
*/
116119
private boolean captureRequests = true;
117120

121+
/**
122+
* The maximum size response body that can be replayed to the browser. Setting this to -1 will cause all requests by
123+
* the browser to be made against the live web.
124+
*/
125+
private int maxReplayLength = 100 * 1024 * 1024;
126+
118127
private Semaphore openWindowsSemaphore = null;
119128
private ChromeProcess process = null;
120129
private ChromeClient client = null;
121130

122131
private final CrawlController controller;
123132
private final ApplicationEventPublisher eventPublisher;
133+
private ProcessorChain extractorChain;
124134

125135
public ExtractorChrome(CrawlController controller, ApplicationEventPublisher eventPublisher) {
126136
this.controller = controller;
@@ -149,6 +159,8 @@ protected boolean innerExtract(CrawlURI uri) {
149159

150160
private void visit(CrawlURI curi) throws InterruptedException {
151161
try (ChromeWindow window = client.createWindow(windowWidth, windowHeight)) {
162+
window.interceptRequests(request -> handleInterceptedRequest(curi, request));
163+
152164
if (captureRequests) {
153165
window.captureRequests(request -> handleCapturedRequest(curi, request));
154166
}
@@ -170,7 +182,48 @@ private void visit(CrawlURI curi) throws InterruptedException {
170182
}
171183
}
172184

185+
private void handleInterceptedRequest(CrawlURI curi, InterceptedRequest interceptedRequest) {
186+
ChromeRequest request = interceptedRequest.getRequest();
187+
if (request.getMethod().equals("GET") && request.getUrl().equals(curi.getURI())) {
188+
replayResponseToBrowser(curi, interceptedRequest);
189+
} else {
190+
interceptedRequest.continueNormally();
191+
}
192+
}
193+
194+
@SuppressWarnings("unchecked")
195+
private void replayResponseToBrowser(CrawlURI curi, InterceptedRequest interceptedRequest) {
196+
// There seems to be no easy way to stream the body to the browser so we slurp it into
197+
// memory with a size limit. The one way I can see to achieve streaming is to have Heritrix
198+
// serve the request over its HTTP server and pass a Heritrix URL to Fetch.fulfillRequest
199+
// instead of the body directly. We might need to do that if memory pressure becomes a
200+
// problem but for now just keep it simple.
201+
202+
long bodyLength = curi.getRecorder().getResponseContentLength();
203+
if (bodyLength > maxReplayLength) {
204+
logger.log(FINE, "Page body too large to replay: {0}", curi.getURI());
205+
interceptedRequest.continueNormally();
206+
return;
207+
}
208+
209+
byte[] body = new byte[(int)bodyLength];
210+
try (InputStream stream = curi.getRecorder().getContentReplayInputStream()) {
211+
IOUtils.readFully(stream, body);
212+
} catch (IOException e) {
213+
logger.log(WARNING, "Error reading back page body: " + curi.getURI(), e);
214+
interceptedRequest.continueNormally();
215+
return;
216+
}
217+
218+
Map<String,String> headers = (Map<String, String>) curi.getData().get(A_HTTP_RESPONSE_HEADERS);
219+
interceptedRequest.fulfill(curi.getFetchStatus(), headers.entrySet(), body);
220+
}
221+
173222
private void handleCapturedRequest(CrawlURI via, ChromeRequest request) {
223+
if (request.isResponseFulfilledByInterception()) {
224+
return;
225+
}
226+
174227
Recorder recorder = new Recorder(controller.getScratchDir().getFile(),
175228
controller.getRecorderOutBufferBytes(),
176229
controller.getRecorderInBufferBytes());
@@ -219,11 +272,21 @@ public int read() {
219272
break;
220273
}
221274

222-
// send it to the disposition chain to invoke the warc writer etc
223-
Frontier frontier = controller.getFrontier(); // allowed to be null to simplify unit tests
275+
Frontier frontier = controller.getFrontier();
224276
curi.getOverlayNames(); // for side-effect of creating the overlayNames list
277+
278+
// inform the frontier we've already seen this uri so it won't schedule it
279+
// we only do this for GETs so a POST doesn't prevent scheduling a GET of the same URI
280+
if (request.getMethod().equals("GET")) {
281+
frontier.considerIncluded(curi);
282+
}
283+
225284
KeyedProperties.loadOverridesFrom(curi);
226285
try {
286+
// perform link extraction
287+
extractorChain.process(curi, null);
288+
289+
// send the result to the disposition chain to dispatch outlinks and write warcs
227290
frontier.beginDisposition(curi);
228291
controller.getDispositionChain().process(curi,null);
229292
} finally {
@@ -261,6 +324,20 @@ public void start() {
261324
}
262325
client = new ChromeClient(process.getDevtoolsUrl());
263326
}
327+
328+
if (extractorChain == null) {
329+
// The fetch chain normally includes some preprocessing, fetch and extractor processors, but we want just
330+
// the extractors as we let the browser fetch subresources. So we construct a new chain consisting of the
331+
// extractors only.
332+
List<Processor> extractors = new ArrayList<>();
333+
for (Processor processor : controller.getFetchChain().getProcessors()) {
334+
if (processor instanceof Extractor) {
335+
extractors.add(processor);
336+
}
337+
}
338+
extractorChain = new ProcessorChain();
339+
extractorChain.setProcessors(extractors);
340+
}
264341
}
265342

266343
@Override

contrib/src/main/java/org/archive/net/chrome/ChromeRequest.java

+9
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class ChromeRequest {
3333
private JSONObject rawResponseHeaders;
3434
private String responseHeadersText;
3535
private final long beginTime = System.currentTimeMillis();
36+
private boolean responseFulfilledByInterception;
3637

3738
public ChromeRequest(ChromeWindow window, String id) {
3839
this.window = window;
@@ -151,4 +152,12 @@ public String getRemoteIPAddress() {
151152
void setRequestJson(JSONObject requestJson) {
152153
this.requestJson = requestJson;
153154
}
155+
156+
void setResponseFulfilledByInterception(boolean responseFulfilledByInterception) {
157+
this.responseFulfilledByInterception = responseFulfilledByInterception;
158+
}
159+
160+
public boolean isResponseFulfilledByInterception() {
161+
return responseFulfilledByInterception;
162+
}
154163
}

contrib/src/main/java/org/archive/net/chrome/ChromeWindow.java

+21-12
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
import java.util.function.Consumer;
2828
import java.util.logging.Logger;
2929

30-
import static java.util.logging.Level.FINE;
31-
import static java.util.logging.Level.WARNING;
30+
import static java.util.logging.Level.*;
3231

3332
/**
3433
* A browser window or tab.
@@ -43,6 +42,7 @@ public class ChromeWindow implements Closeable {
4342
private CompletableFuture<Void> loadEventFuture;
4443
private final Map<String,ChromeRequest> requestMap = new ConcurrentHashMap<>();
4544
private Consumer<ChromeRequest> requestConsumer;
45+
private Consumer<InterceptedRequest> requestInterceptor;
4646
private final ExecutorService eventExecutor;
4747

4848
public ChromeWindow(ChromeClient client, String targetId) {
@@ -136,6 +136,22 @@ private void handleEventOnEventThread(JSONObject message) {
136136
}
137137
}
138138

139+
private void handlePausedRequest(JSONObject params) {
140+
String networkId = params.getString("networkId");
141+
ChromeRequest request = requestMap.computeIfAbsent(networkId, id -> new ChromeRequest(this, id));
142+
request.setRequestJson(params.getJSONObject("request"));
143+
String id = params.getString("requestId");
144+
InterceptedRequest interceptedRequest = new InterceptedRequest(this, id, request);
145+
try {
146+
requestInterceptor.accept(interceptedRequest);
147+
} catch (Exception e) {
148+
logger.log(SEVERE, "Request interceptor threw", e);
149+
}
150+
if (!interceptedRequest.isHandled()) {
151+
interceptedRequest.continueNormally();
152+
}
153+
}
154+
139155
private void handleRequestWillBeSent(JSONObject params) {
140156
String requestId = params.getString("requestId");
141157
ChromeRequest request = requestMap.computeIfAbsent(requestId, id -> new ChromeRequest(this, id));
@@ -198,15 +214,8 @@ public void captureRequests(Consumer<ChromeRequest> requestConsumer) {
198214
call("Network.enable");
199215
}
200216

201-
private void handlePausedRequest(JSONObject params) {
202-
if (params.has("responseStatusCode")) {
203-
String stream = call("Fetch.takeResponseBodyAsStream", "requestId",
204-
params.getString("requestId")).getString("stream");
205-
System.out.println(call("IO.read", "handle", stream));
206-
System.out.println("stream " + stream);
207-
call("IO.close", "handle", stream);
208-
} else {
209-
call("Fetch.continueRequest", "requestId", params.getString("requestId"));
210-
}
217+
public void interceptRequests(Consumer<InterceptedRequest> requestInterceptor) {
218+
this.requestInterceptor = requestInterceptor;
219+
call("Fetch.enable");
211220
}
212221
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* This file is part of the Heritrix web crawler (crawler.archive.org).
3+
*
4+
* Licensed to the Internet Archive (IA) by one or more individual
5+
* contributors.
6+
*
7+
* The IA licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.archive.net.chrome;
21+
22+
import org.json.JSONArray;
23+
import org.json.JSONObject;
24+
25+
import java.util.Base64;
26+
import java.util.Collection;
27+
import java.util.Map;
28+
29+
public class InterceptedRequest {
30+
private final String id;
31+
private final ChromeRequest request;
32+
private final ChromeWindow window;
33+
private boolean handled;
34+
35+
public InterceptedRequest(ChromeWindow window, String id, ChromeRequest request) {
36+
this.window = window;
37+
this.id = id;
38+
this.request = request;
39+
}
40+
41+
public ChromeRequest getRequest() {
42+
return request;
43+
}
44+
45+
public void fulfill(int status, Collection<Map.Entry<String,String>> headers, byte[] body) {
46+
setHandled();
47+
JSONArray headerArray = new JSONArray();
48+
for (Map.Entry<String,String> entry : headers) {
49+
JSONObject object = new JSONObject();
50+
object.put("name", entry.getKey());
51+
object.put("value", entry.getValue());
52+
headerArray.put(object);
53+
}
54+
String encodedBody = Base64.getEncoder().encodeToString(body);
55+
request.setResponseFulfilledByInterception(true);
56+
window.call("Fetch.fulfillRequest",
57+
"requestId", id,
58+
"responseCode", status,
59+
"responseHeaders", headerArray,
60+
"body", encodedBody);
61+
}
62+
63+
public void continueNormally() {
64+
setHandled();
65+
window.call("Fetch.continueRequest", "requestId", id);
66+
}
67+
68+
public boolean isHandled() {
69+
return handled;
70+
}
71+
72+
private void setHandled() {
73+
if (handled) {
74+
throw new IllegalStateException("intercepted request already handled");
75+
}
76+
handled = true;
77+
}
78+
}

0 commit comments

Comments
 (0)