Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExtractorChrome: reduce request duplication between browser and frontier #416

Merged
merged 3 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.archive.modules.extractor;

import org.apache.commons.io.IOUtils;
import org.archive.crawler.event.CrawlURIDispositionEvent;
import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.framework.Frontier;
import org.archive.modules.CrawlURI;
import org.archive.net.chrome.ChromeClient;
import org.archive.net.chrome.ChromeProcess;
import org.archive.net.chrome.ChromeRequest;
import org.archive.net.chrome.ChromeWindow;
import org.archive.modules.Processor;
import org.archive.modules.ProcessorChain;
import org.archive.net.chrome.*;
import org.archive.spring.KeyedProperties;
import org.archive.util.Recorder;
import org.json.JSONArray;
Expand All @@ -36,7 +36,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,11 +49,11 @@

import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Collections.enumeration;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Level.*;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import static org.archive.crawler.event.CrawlURIDispositionEvent.Disposition.FAILED;
import static org.archive.crawler.event.CrawlURIDispositionEvent.Disposition.SUCCEEDED;
import static org.archive.modules.CoreAttributeConstants.A_HTTP_RESPONSE_HEADERS;
import static org.archive.modules.CrawlURI.FetchType.*;

/**
Expand Down Expand Up @@ -115,12 +118,19 @@ public class ExtractorChrome extends ContentExtractor {
*/
private boolean captureRequests = true;

/**
* The maximum size response body that can be replayed to the browser. Setting this to -1 will cause all requests by
* the browser to be made against the live web.
*/
private int maxReplayLength = 100 * 1024 * 1024;

private Semaphore openWindowsSemaphore = null;
private ChromeProcess process = null;
private ChromeClient client = null;

private final CrawlController controller;
private final ApplicationEventPublisher eventPublisher;
private ProcessorChain extractorChain;

public ExtractorChrome(CrawlController controller, ApplicationEventPublisher eventPublisher) {
this.controller = controller;
Expand Down Expand Up @@ -149,6 +159,8 @@ protected boolean innerExtract(CrawlURI uri) {

private void visit(CrawlURI curi) throws InterruptedException {
try (ChromeWindow window = client.createWindow(windowWidth, windowHeight)) {
window.interceptRequests(request -> handleInterceptedRequest(curi, request));

if (captureRequests) {
window.captureRequests(request -> handleCapturedRequest(curi, request));
}
Expand All @@ -170,7 +182,48 @@ private void visit(CrawlURI curi) throws InterruptedException {
}
}

private void handleInterceptedRequest(CrawlURI curi, InterceptedRequest interceptedRequest) {
ChromeRequest request = interceptedRequest.getRequest();
if (request.getMethod().equals("GET") && request.getUrl().equals(curi.getURI())) {
replayResponseToBrowser(curi, interceptedRequest);
} else {
interceptedRequest.continueNormally();
}
}

@SuppressWarnings("unchecked")
private void replayResponseToBrowser(CrawlURI curi, InterceptedRequest interceptedRequest) {
// There seems to be no easy way to stream the body to the browser so we slurp it into
// memory with a size limit. The one way I can see to achieve streaming is to have Heritrix
// serve the request over its HTTP server and pass a Heritrix URL to Fetch.fulfillRequest
// instead of the body directly. We might need to do that if memory pressure becomes a
// problem but for now just keep it simple.

long bodyLength = curi.getRecorder().getResponseContentLength();
if (bodyLength > maxReplayLength) {
logger.log(FINE, "Page body too large to replay: {0}", curi.getURI());
interceptedRequest.continueNormally();
return;
}

byte[] body = new byte[(int)bodyLength];
try (InputStream stream = curi.getRecorder().getContentReplayInputStream()) {
IOUtils.readFully(stream, body);
} catch (IOException e) {
logger.log(WARNING, "Error reading back page body: " + curi.getURI(), e);
interceptedRequest.continueNormally();
return;
}

Map<String,String> headers = (Map<String, String>) curi.getData().get(A_HTTP_RESPONSE_HEADERS);
interceptedRequest.fulfill(curi.getFetchStatus(), headers.entrySet(), body);
}

private void handleCapturedRequest(CrawlURI via, ChromeRequest request) {
if (request.isResponseFulfilledByInterception()) {
return;
}

Recorder recorder = new Recorder(controller.getScratchDir().getFile(),
controller.getRecorderOutBufferBytes(),
controller.getRecorderInBufferBytes());
Expand Down Expand Up @@ -219,11 +272,21 @@ public int read() {
break;
}

// send it to the disposition chain to invoke the warc writer etc
Frontier frontier = controller.getFrontier(); // allowed to be null to simplify unit tests
Frontier frontier = controller.getFrontier();
curi.getOverlayNames(); // for side-effect of creating the overlayNames list

// inform the frontier we've already seen this uri so it won't schedule it
// we only do this for GETs so a POST doesn't prevent scheduling a GET of the same URI
if (request.getMethod().equals("GET")) {
frontier.considerIncluded(curi);
}

KeyedProperties.loadOverridesFrom(curi);
try {
// perform link extraction
extractorChain.process(curi, null);

// send the result to the disposition chain to dispatch outlinks and write warcs
frontier.beginDisposition(curi);
controller.getDispositionChain().process(curi,null);
} finally {
Expand Down Expand Up @@ -261,6 +324,20 @@ public void start() {
}
client = new ChromeClient(process.getDevtoolsUrl());
}

if (extractorChain == null) {
// The fetch chain normally includes some preprocessing, fetch and extractor processors, but we want just
// the extractors as we let the browser fetch subresources. So we construct a new chain consisting of the
// extractors only.
List<Processor> extractors = new ArrayList<>();
for (Processor processor : controller.getFetchChain().getProcessors()) {
if (processor instanceof Extractor) {
extractors.add(processor);
}
}
extractorChain = new ProcessorChain();
extractorChain.setProcessors(extractors);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ChromeRequest {
private JSONObject rawResponseHeaders;
private String responseHeadersText;
private final long beginTime = System.currentTimeMillis();
private boolean responseFulfilledByInterception;

public ChromeRequest(ChromeWindow window, String id) {
this.window = window;
Expand Down Expand Up @@ -151,4 +152,12 @@ public String getRemoteIPAddress() {
void setRequestJson(JSONObject requestJson) {
this.requestJson = requestJson;
}

void setResponseFulfilledByInterception(boolean responseFulfilledByInterception) {
this.responseFulfilledByInterception = responseFulfilledByInterception;
}

public boolean isResponseFulfilledByInterception() {
return responseFulfilledByInterception;
}
}
33 changes: 21 additions & 12 deletions contrib/src/main/java/org/archive/net/chrome/ChromeWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import java.util.function.Consumer;
import java.util.logging.Logger;

import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Level.*;

/**
* A browser window or tab.
Expand All @@ -43,6 +42,7 @@ public class ChromeWindow implements Closeable {
private CompletableFuture<Void> loadEventFuture;
private final Map<String,ChromeRequest> requestMap = new ConcurrentHashMap<>();
private Consumer<ChromeRequest> requestConsumer;
private Consumer<InterceptedRequest> requestInterceptor;
private final ExecutorService eventExecutor;

public ChromeWindow(ChromeClient client, String targetId) {
Expand Down Expand Up @@ -136,6 +136,22 @@ private void handleEventOnEventThread(JSONObject message) {
}
}

private void handlePausedRequest(JSONObject params) {
String networkId = params.getString("networkId");
ChromeRequest request = requestMap.computeIfAbsent(networkId, id -> new ChromeRequest(this, id));
request.setRequestJson(params.getJSONObject("request"));
String id = params.getString("requestId");
InterceptedRequest interceptedRequest = new InterceptedRequest(this, id, request);
try {
requestInterceptor.accept(interceptedRequest);
} catch (Exception e) {
logger.log(SEVERE, "Request interceptor threw", e);
}
if (!interceptedRequest.isHandled()) {
interceptedRequest.continueNormally();
}
}

private void handleRequestWillBeSent(JSONObject params) {
String requestId = params.getString("requestId");
ChromeRequest request = requestMap.computeIfAbsent(requestId, id -> new ChromeRequest(this, id));
Expand Down Expand Up @@ -198,15 +214,8 @@ public void captureRequests(Consumer<ChromeRequest> requestConsumer) {
call("Network.enable");
}

private void handlePausedRequest(JSONObject params) {
if (params.has("responseStatusCode")) {
String stream = call("Fetch.takeResponseBodyAsStream", "requestId",
params.getString("requestId")).getString("stream");
System.out.println(call("IO.read", "handle", stream));
System.out.println("stream " + stream);
call("IO.close", "handle", stream);
} else {
call("Fetch.continueRequest", "requestId", params.getString("requestId"));
}
public void interceptRequests(Consumer<InterceptedRequest> requestInterceptor) {
this.requestInterceptor = requestInterceptor;
call("Fetch.enable");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.archive.net.chrome;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.Base64;
import java.util.Collection;
import java.util.Map;

public class InterceptedRequest {
private final String id;
private final ChromeRequest request;
private final ChromeWindow window;
private boolean handled;

public InterceptedRequest(ChromeWindow window, String id, ChromeRequest request) {
this.window = window;
this.id = id;
this.request = request;
}

public ChromeRequest getRequest() {
return request;
}

public void fulfill(int status, Collection<Map.Entry<String,String>> headers, byte[] body) {
setHandled();
JSONArray headerArray = new JSONArray();
for (Map.Entry<String,String> entry : headers) {
JSONObject object = new JSONObject();
object.put("name", entry.getKey());
object.put("value", entry.getValue());
headerArray.put(object);
}
String encodedBody = Base64.getEncoder().encodeToString(body);
request.setResponseFulfilledByInterception(true);
window.call("Fetch.fulfillRequest",
"requestId", id,
"responseCode", status,
"responseHeaders", headerArray,
"body", encodedBody);
}

public void continueNormally() {
setHandled();
window.call("Fetch.continueRequest", "requestId", id);
}

public boolean isHandled() {
return handled;
}

private void setHandled() {
if (handled) {
throw new IllegalStateException("intercepted request already handled");
}
handled = true;
}
}
Loading