Skip to content

Commit

Permalink
Clean up buffer when MaterializedInputStream is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Feb 26, 2025
1 parent 49f99fe commit c9aa74a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,21 @@ public static <T> JsonResponse<T> execute(TrinoJsonCodec<T> codec, Call.Factory
try (Response response = client.newCall(request).execute()) {
ResponseBody responseBody = requireNonNull(response.body());
if (isJson(responseBody.contentType())) {
T value = null;
IllegalArgumentException exception = null;
MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 32 * 1024);
try (InputStream ignored = stream) {
// Parse from input stream, response is either of unknown size or too large to materialize. Raw response body
// will not be available if parsing fails
value = codec.fromJson(stream);
T value = codec.fromJson(stream);
return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, null);
}
catch (JsonProcessingException e) {
exception = new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), stream.getHeadString()), e);
return new JsonResponse<>(
response.code(),
response.headers(),
stream.getHeadString(),
null,
new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), stream.getHeadString()), e));
}
return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, exception);
}
return new JsonResponse<>(response.code(), response.headers(), responseBody.string());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class MaterializingInputStream
extends FilterInputStream
{
private final byte[] head;
private byte[] head;
private int remaining;
private int currentOffset;

Expand Down Expand Up @@ -77,6 +77,9 @@ public int read(byte[] buffer)

public String getHeadString()
{
if (head == null) {
return "<empty>";
}
return new String(head, 0, currentOffset) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : "");

Check failure on line 83 in client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Implicit use of the platform default charset, which can result in differing behaviour between JVM executions or incorrect behavior if the encoding of the data source doesn't match expectations.

Check failure on line 83 in client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Implicit use of the platform default charset, which can result in differing behaviour between JVM executions or incorrect behavior if the encoding of the data source doesn't match expectations.
}

Expand All @@ -87,4 +90,12 @@ private String bytesOmitted(long bytes)
}
return format("%d more bytes", bytes);
}

@Override
public void close()
throws IOException
{
super.close();
head = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class TrinoJsonCodec<T>
.disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS)
.disable(MapperFeature.INFER_PROPERTY_MUTATORS)
.disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS)
.disable(StreamReadFeature.AUTO_CLOSE_SOURCE)
.addModule(new Jdk8Module())
.addModule(new QueryDataClientJacksonModule())
.build();
Expand Down

0 comments on commit c9aa74a

Please sign in to comment.