diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index d3b7ff10380..15432132607 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -339,13 +339,16 @@ private void endRequest(Stream s) { * @return whether the stream should be considered as closed */ private boolean reset(Stream stream) { - boolean isInflight; + boolean inflight; synchronized (this) { - isInflight = responses.remove(stream) || (requests.remove(stream) && stream.responseEnded); - close = isInflight; + inflight = responses.contains(stream) || stream.responseEnded; + if (!inflight) { + requests.remove(stream); + } + close = inflight; } checkLifecycle(); - return !isInflight; + return !inflight; } private void receiveBytes(int len) { @@ -433,18 +436,20 @@ private static class StreamImpl extends Stream implements HttpClientStream { this.conn = conn; this.queue = new InboundBuffer<>(context, 5) .handler(item -> { - if (item instanceof MultiMap) { - Handler handler = endHandler; - if (handler != null) { - handler.handle((MultiMap) item); - } - } else { - Buffer buffer = (Buffer) item; - int len = buffer.length(); - conn.ackBytes(len); - Handler handler = chunkHandler; - if (handler != null) { - handler.handle(buffer); + if (!reset) { + if (item instanceof MultiMap) { + Handler handler = endHandler; + if (handler != null) { + handler.handle((MultiMap) item); + } + } else { + Buffer buffer = (Buffer) item; + int len = buffer.length(); + conn.ackBytes(len); + Handler handler = chunkHandler; + if (handler != null) { + handler.handle(buffer); + } } } }) diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index b350bb16624..962353096b9 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -5833,6 +5833,34 @@ public void testResetClientRequestAwaitingResponse() throws Exception { await(); } + @Test + public void testResetClientRequestResponseInProgress() throws Exception { + server.requestHandler(req -> { + HttpServerResponse resp = req.response(); + resp.setChunked(true); + for (int i = 0;i < 16;i++) { + resp.write("chunk-" + i); + } + resp.end(); + }); + startServer(testAddress); + client.request(requestOptions).onComplete(onSuccess(req -> { + req.send(onSuccess(resp -> { + resp.handler(buff -> { + fail(); + }); + resp.endHandler(v -> { + fail(); + }); + req.connection().close(onSuccess(v -> { + testComplete(); + })); + req.reset(); + })); + })); + await(); + } + @Test public void testSimpleCookie() throws Exception { testCookies("foo=bar", req -> {