Skip to content

Commit

Permalink
Added a Response progress event interface (#2164)
Browse files Browse the repository at this point in the history
Added a Response progress event interface

Fixes #656
  • Loading branch information
jhy authored Jul 5, 2024
1 parent 97e6211 commit 8c8f3f9
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 4 deletions.
11 changes: 11 additions & 0 deletions src/main/java/org/jsoup/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,17 @@ default Connection auth(@Nullable RequestAuthenticator authenticator) {
*/
Connection response(Response response);

/**
Set the response progress handler, which will be called as the response body is downloaded. As documents are parsed
as they are downloaded, this is also a good proxy for the parse progress.
<p>The Response is supplied as the progress context and may be read from to obtain headers etc.</p>
@param handler the progress handler
@return this Connection, for chaining
*/
default Connection onResponseProgress(Progress<Response> handler) {
throw new UnsupportedOperationException();
}

/**
* Common methods for Requests and Responses
* @param <T> Type of Base, either Request or Response
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/jsoup/Progress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.jsoup;

@FunctionalInterface
public interface Progress<ProgressContext> {
/**
Called to report progress. Note that this will be executed by the same thread that is doing the work, so either
don't take to long, or hand it off to another thread.
@param processed the number of bytes processed so far.
@param total the total number of expected bytes, or -1 if unknown.
@param percent the percentage of completion, 0.0..100.0. If the expected total is unknown, % will remain at zero
until complete.
@param context the object that progress was made on.
*/
void onProgress(int processed, int total, float percent, ProgressContext context);
}
13 changes: 12 additions & 1 deletion src/main/java/org/jsoup/helper/HttpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.jsoup.Connection;
import org.jsoup.HttpStatusException;
import org.jsoup.Progress;
import org.jsoup.UncheckedIOException;
import org.jsoup.UnsupportedMimeTypeException;
import org.jsoup.internal.ControllableInputStream;
Expand Down Expand Up @@ -43,7 +44,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.Inflater;
Expand Down Expand Up @@ -388,6 +388,11 @@ public Connection postDataCharset(String charset) {
return this;
}

@Override public Connection onResponseProgress(Progress<Connection.Response> handler) {
req.responseProgress = handler;
return this;
}

@SuppressWarnings("unchecked")
private static abstract class Base<T extends Connection.Base<T>> implements Connection.Base<T> {
private static final URL UnsetUrl; // only used if you created a new Request()
Expand Down Expand Up @@ -607,6 +612,8 @@ public static class Request extends HttpConnection.Base<Connection.Request> impl
private @Nullable SSLSocketFactory sslSocketFactory;
private CookieManager cookieManager;
private @Nullable RequestAuthenticator authenticator;
private @Nullable Progress<Connection.Response> responseProgress;

private volatile boolean executing = false;

Request() {
Expand Down Expand Up @@ -638,6 +645,7 @@ public static class Request extends HttpConnection.Base<Connection.Request> impl
sslSocketFactory = copy.sslSocketFactory; // these are all synchronized so safe to share
cookieManager = copy.cookieManager;
authenticator = copy.authenticator;
responseProgress = copy.responseProgress;
executing = false;
}

Expand Down Expand Up @@ -909,6 +917,9 @@ else if (res.hasHeaderWithValue(CONTENT_ENCODING, "deflate"))
res.bodyStream = ControllableInputStream.wrap(
stream, SharedConstants.DefaultBufferSize, req.maxBodySize())
.timeout(startTime, req.timeout());

if (req.responseProgress != null) // set response progress listener
res.bodyStream.onProgress(conn.getContentLength(), req.responseProgress, res);
} else {
res.byteData = DataUtil.emptyByteBuffer();
}
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/org/jsoup/internal/ControllableInputStream.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.jsoup.internal;

import org.jsoup.helper.DataUtil;
import org.jsoup.Progress;
import org.jsoup.helper.Validate;
import org.jspecify.annotations.Nullable;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -28,6 +29,12 @@ public class ControllableInputStream extends FilterInputStream {
private int markPos;
private boolean interrupted;

// if we are tracking progress, will have the expected content length, progress callback, connection
private @Nullable Progress<?> progress;
private @Nullable Object progressContext;
private int contentLength = -1;
private int readPos = 0; // amount read; can be reset()

private ControllableInputStream(BufferedInputStream in, int maxSize) {
super(in);
Validate.isTrue(maxSize >= 0);
Expand Down Expand Up @@ -57,6 +64,8 @@ else if (in instanceof BufferedInputStream)

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (readPos == 0) emitProgress(); // emits a progress

if (interrupted || capped && remaining <= 0)
return -1;
if (Thread.currentThread().isInterrupted()) {
Expand All @@ -72,7 +81,14 @@ public int read(byte[] b, int off, int len) throws IOException {

try {
final int read = super.read(b, off, len);
remaining -= read;
if (read == -1) { // completed
contentLength = readPos;
} else {
remaining -= read;
readPos += read;
}
emitProgress();

return read;
} catch (SocketTimeoutException e) {
if (expired())
Expand Down Expand Up @@ -114,6 +130,7 @@ public static ByteBuffer readToByteBuffer(InputStream in, int max) throws IOExce
@Override public void reset() throws IOException {
super.reset();
remaining = maxSize - markPos;
readPos = markPos; // readPos is used for progress emits
}

@SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") // not synchronized in later JDKs
Expand All @@ -128,6 +145,24 @@ public ControllableInputStream timeout(long startTimeNanos, long timeoutMillis)
return this;
}

private void emitProgress() {
if (progress == null) return;
// calculate percent complete if contentLength > 0 (and cap to 100.0 if totalRead > contentLength):
float percent = contentLength > 0 ? Math.min(100f, readPos * 100f / contentLength) : 0;
//noinspection unchecked
((Progress<Object>) progress).onProgress(readPos, contentLength, percent, progressContext); // (not actually unchecked - verified when set)
if (percent == 100.0f) progress = null; // detach once we reach 100%, so that any subsequent buffer hits don't report 100 again
}

public <ProgressContext> ControllableInputStream onProgress(int contentLength, Progress<ProgressContext> callback, ProgressContext context) {
Validate.notNull(callback);
Validate.notNull(context);
this.contentLength = contentLength;
this.progress = callback;
this.progressContext = context;
return this;
}

private boolean expired() {
if (timeout == 0)
return false;
Expand Down
61 changes: 60 additions & 1 deletion src/test/java/org/jsoup/integration/ConnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import javax.servlet.http.HttpServletResponse;
import java.io.File;
Expand All @@ -34,6 +35,7 @@
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

Expand All @@ -47,6 +49,8 @@
* Tests Jsoup.connect against a local server.
*/
public class ConnectTest {
private static final int LargeDocFileLen = 280735;
private static final int LargeDocTextLen = 269535;
private static String echoUrl;

@BeforeAll
Expand Down Expand Up @@ -746,7 +750,7 @@ public void maxBodySize() throws IOException {
Connection.Response largeRes = Jsoup.connect(url).maxBodySize(300 * 1024).execute(); // does not crop
Connection.Response unlimitedRes = Jsoup.connect(url).maxBodySize(0).execute();

int actualDocText = 269535;
int actualDocText = LargeDocTextLen;
assertEquals(actualDocText, defaultRes.parse().text().length());
assertEquals(49165, smallRes.parse().text().length());
assertEquals(196577, mediumRes.parse().text().length());
Expand Down Expand Up @@ -944,4 +948,59 @@ void incorrectAuth(String url) throws IOException {
}

// proxy connection tests are in ProxyTest

@ParameterizedTest
@ValueSource(strings = {
"/htmltests/large.html",
"/htmltests/large.html?" + FileServlet.SuppressContentLength
})
void progressListener(String path) throws IOException {
String url = FileServlet.urlTo(path);
boolean knownContentLength = !url.contains(FileServlet.SuppressContentLength);

AtomicBoolean seenProgress = new AtomicBoolean(false);
AtomicBoolean completed = new AtomicBoolean(false);
AtomicInteger numProgress = new AtomicInteger();

Connection con = Jsoup.connect(url).onResponseProgress((processed, total, percent, response) -> {
//System.out.println("Processed: " + processed + " of " + total + " (" + percent + "%)");
if (!seenProgress.get()) {
seenProgress.set(true);
assertEquals(0, processed);
assertEquals(knownContentLength ? LargeDocFileLen : -1, total);
assertEquals(0.0f, percent);

assertEquals(200, response.statusCode());
String contentLength = response.header("Content-Length");
if (knownContentLength) {
assertNotNull(contentLength);
assertEquals(String.valueOf(LargeDocFileLen), contentLength);
} else {
assertNull(contentLength);
}
assertEquals(url, response.url().toExternalForm());
}
numProgress.getAndIncrement();

if (percent == 100.0f) {
// even if the content-length is not set, we get 100% when the read is completed
completed.set(true);
assertEquals(LargeDocFileLen, processed);
}

});
Document document = con.get();

assertTrue(seenProgress.get());
assertTrue(completed.get());

// should expect to see events relative to how large the buffer is.
int expected = LargeDocFileLen / 8192;
assertTrue(numProgress.get() > expected * 0.75);
assertTrue(numProgress.get() < expected * 1.25);

// check the document works
assertEquals(LargeDocTextLen, document.text().length());
}
}

53 changes: 53 additions & 0 deletions src/test/java/org/jsoup/integration/SessionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -105,6 +107,57 @@ public void multiThreadWithoutNewRequestBlowsUp() throws InterruptedException {
assertEquals(numThreads - 1, catcher.exceptionCount.get());
}

@Test
public void multiThreadWithProgressListener() throws InterruptedException {
// tests that we can use one progress listener for multiple URLs and threads.
int numThreads = 10;
String[] urls = {
FileServlet.urlTo("/htmltests/medium.html"),
FileServlet.urlTo("/htmltests/upload-form.html"),
FileServlet.urlTo("/htmltests/comments.html"),
FileServlet.urlTo("/htmltests/large.html"),
};
Set<String> seenUrls = ConcurrentHashMap.newKeySet();
AtomicInteger completedCount = new AtomicInteger(0);
ThreadCatcher catcher = new ThreadCatcher();

Connection session = Jsoup.newSession()
.onResponseProgress((processed, total, percent, response) -> {
if (percent == 100.0f) {
//System.out.println("Completed " + Thread.currentThread().getName() + "- " + response.url());
seenUrls.add(response.url().toExternalForm());
completedCount.incrementAndGet();
}
});

Thread[] threads = new Thread[numThreads];
for (int threadNum = 0; threadNum < numThreads; threadNum++) {
Thread thread = new Thread(() -> {
for (String url : urls) {
try {
Connection con = session.newRequest().url(url);
con.get();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
thread.setName("Runner-" + threadNum);
thread.start();
thread.setUncaughtExceptionHandler(catcher);
threads[threadNum] = thread;
}

// now join them all
for (Thread thread : threads) {
thread.join();
}

assertEquals(0, catcher.exceptionCount.get());
assertEquals(urls.length, seenUrls.size());
assertEquals(urls.length * numThreads, completedCount.get());
}


static class ThreadCatcher implements Thread.UncaughtExceptionHandler {
AtomicInteger exceptionCount = new AtomicInteger();
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/org/jsoup/integration/servlets/FileServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class FileServlet extends BaseServlet {
}
public static final String ContentTypeParam = "contentType";
public static final String DefaultType = "text/html";
public static final String SuppressContentLength = "surpriseMe";

@Override
protected void doIt(HttpServletRequest req, HttpServletResponse res) throws IOException {
Expand All @@ -33,6 +34,8 @@ protected void doIt(HttpServletRequest req, HttpServletResponse res) throws IOEx
res.setContentType(contentType);
if (file.getName().endsWith("gz"))
res.addHeader("Content-Encoding", "gzip");
if (req.getParameter(SuppressContentLength) == null)
res.setContentLength((int) file.length());
res.setStatus(HttpServletResponse.SC_OK);

ServletOutputStream out = res.getOutputStream();
Expand Down

0 comments on commit 8c8f3f9

Please sign in to comment.