Skip to content

Commit

Permalink
Improve error handling in on_accept (#750)
Browse files Browse the repository at this point in the history
* Improve error handling in on_accept

* Move lock to the top of the function

* Lock shared data at the right locations.

* [http_listener] improve refcount and lifetime management by using RAII.
  • Loading branch information
sridmad authored and ras0219-msft committed Aug 3, 2018
1 parent c214c81 commit c8c9227
Showing 1 changed file with 75 additions and 23 deletions.
98 changes: 75 additions & 23 deletions Release/src/http/listener/http_server_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ namespace
}

private:
void on_accept(boost::asio::ip::tcp::socket* socket, const boost::system::error_code& ec);
void on_accept(std::unique_ptr<boost::asio::ip::tcp::socket> socket, const boost::system::error_code& ec);

};

Expand Down Expand Up @@ -333,20 +333,41 @@ class asio_server_connection
std::unique_ptr<boost::asio::ssl::context> m_ssl_context;
std::unique_ptr<ssl_stream> m_ssl_stream;

public:
asio_server_connection(std::unique_ptr<boost::asio::ip::tcp::socket> socket, http_linux_server* server, hostport_listener* parent)
: m_socket(std::move(socket))
, m_request_buf()
, m_response_buf()
, m_p_server(server)
, m_p_parent(parent)
, m_close(false)
, m_chunked(false)
, m_refs(1)
{
}

will_deref_and_erase_t start(bool is_https, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback)
struct Dereferencer
{
void operator()(asio_server_connection* conn) const { conn->deref(); }
};

public:
using refcount_ptr = std::unique_ptr<asio_server_connection, Dereferencer>;

static refcount_ptr create(std::unique_ptr<boost::asio::ip::tcp::socket> socket, http_linux_server* server, hostport_listener* parent)
{
return refcount_ptr(new asio_server_connection(std::move(socket), server, parent));
}

refcount_ptr get_reference()
{
++m_refs;
return refcount_ptr(this);
}

will_erase_from_parent_t start_connection(bool is_https, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback)
{
auto unique_reference = this->get_reference();

if (is_https)
{
m_ssl_context = make_unique<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23);
Expand All @@ -360,11 +381,14 @@ class asio_server_connection
{
(will_deref_and_erase_t)this->start_request_response();
});
return will_deref_and_erase_t{};
unique_reference.release();
return will_erase_from_parent_t{};
}
else
{
return start_request_response();
(will_deref_and_erase_t)start_request_response();
unique_reference.release();
return will_erase_from_parent_t{};
}
}

Expand All @@ -385,7 +409,7 @@ class asio_server_connection
will_deref_and_erase_t dispatch_request_to_listener();
will_erase_from_parent_t do_response()
{
++m_refs;
auto unique_reference = this->get_reference();
m_request.get_response().then([=](pplx::task<http_response> r_task)
{
http_response response;
Expand All @@ -406,11 +430,12 @@ class asio_server_connection
(will_deref_and_erase_t)this->async_write(&asio_server_connection::handle_headers_written, response);
});
});
unique_reference.release();
return will_erase_from_parent_t{};
}
will_erase_from_parent_t do_bad_response()
{
++m_refs;
auto unique_reference = this->get_reference();
m_request.get_response().then([=](pplx::task<http_response> r_task)
{
http_response response;
Expand All @@ -428,6 +453,7 @@ class asio_server_connection

(will_deref_and_erase_t)async_write(&asio_server_connection::handle_headers_written, response);
});
unique_reference.release();
return will_erase_from_parent_t{};
}

Expand Down Expand Up @@ -495,10 +521,13 @@ void hostport_listener::start()
m_acceptor->listen(0 != m_backlog ? m_backlog : socket_base::max_connections);

auto socket = new ip::tcp::socket(service);
std::unique_ptr<ip::tcp::socket> usocket(socket);
m_acceptor->async_accept(*socket, [this, socket](const boost::system::error_code& ec)
{
this->on_accept(socket, ec);
std::unique_ptr<ip::tcp::socket> usocket(socket);
this->on_accept(std::move(usocket), ec);
});
usocket.release();
}

void asio_server_connection::close()
Expand Down Expand Up @@ -538,30 +567,53 @@ will_deref_and_erase_t asio_server_connection::start_request_response()
return will_deref_and_erase_t{};
}

void hostport_listener::on_accept(ip::tcp::socket* socket, const boost::system::error_code& ec)
void hostport_listener::on_accept(std::unique_ptr<ip::tcp::socket> socket, const boost::system::error_code& ec)
{
std::unique_ptr<ip::tcp::socket> usocket(std::move(socket));
// Listener closed
if (ec == boost::asio::error::operation_aborted)
{
return;
}

std::lock_guard<std::mutex> lock(m_connections_lock);

// Handle successful accept
if (!ec)
{
auto conn = new asio_server_connection(std::move(usocket), m_p_server, this);
auto conn = asio_server_connection::create(std::move(socket), m_p_server, this);

std::lock_guard<std::mutex> lock(m_connections_lock);
m_connections.insert(conn);
conn->start(m_is_https, m_ssl_context_callback);
if (m_connections.size() == 1)
m_all_connections_complete.reset();
m_connections.insert(conn.get());
try
{
(will_erase_from_parent_t)conn->start_connection(m_is_https, m_ssl_context_callback);
// at this point an asynchronous task has been launched which will call
// m_connections.erase(conn.get()) eventually

if (m_acceptor)
// the following cannot throw
if (m_connections.size() == 1)
m_all_connections_complete.reset();
}
catch (boost::system::system_error&)
{
// spin off another async accept
auto newSocket = new ip::tcp::socket(crossplat::threadpool::shared_instance().service());
m_acceptor->async_accept(*newSocket, [this, newSocket](const boost::system::error_code& ec)
{
this->on_accept(newSocket, ec);
});
// boost ssl apis throw boost::system::system_error.
// Exception indicates something went wrong setting ssl context.
// Drop connection and continue handling other connections.
m_connections.erase(conn.get());
}
}

if (m_acceptor)
{
// spin off another async accept
auto newSocket = new ip::tcp::socket(crossplat::threadpool::shared_instance().service());
std::unique_ptr<ip::tcp::socket> usocket(newSocket);
m_acceptor->async_accept(*newSocket, [this, newSocket](const boost::system::error_code& ec)
{
std::unique_ptr<ip::tcp::socket> usocket(newSocket);
this->on_accept(std::move(usocket), ec);
});
usocket.release();
}
}

will_deref_and_erase_t asio_server_connection::handle_http_line(const boost::system::error_code& ec)
Expand Down

0 comments on commit c8c9227

Please sign in to comment.