Skip to content

Commit

Permalink
Fixes #4904 - WebsocketClient creates more connections than needed.
Browse files Browse the repository at this point in the history
Improved comments.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed May 29, 2020
1 parent 6e2429f commit b0ffaa6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// Association may fail for example if the application
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
Expand All @@ -242,6 +244,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ private void send(boolean create)

private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection;
Expand Down Expand Up @@ -363,33 +367,36 @@ public ProcessResult process(Connection connection)
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}

SendFailure result = send(connection, exchange);
if (result == null)
SendFailure failure = send(connection, exchange);
if (failure == null)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}

if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(result.failure);
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}
Expand Down

0 comments on commit b0ffaa6

Please sign in to comment.