Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

core: remove io-per-queue's io mode & fastrun's running mode #93

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 63 additions & 63 deletions include/dsn/tool-api/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,103 +206,103 @@ class rpc_session : public ref_counter
@addtogroup tool-api-hooks
@{
*/
DSN_API static join_point<void, rpc_session *> on_rpc_session_connected;
DSN_API static join_point<void, rpc_session *> on_rpc_session_disconnected;
static join_point<void, rpc_session *> on_rpc_session_connected;
static join_point<void, rpc_session *> on_rpc_session_disconnected;
/*@}*/
public:
DSN_API rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
bool is_client);
DSN_API virtual ~rpc_session();
rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
bool is_client);
virtual ~rpc_session();

virtual void close_on_fault_injection() = 0;
virtual void connect() = 0;
virtual void close() = 0;

DSN_API bool has_pending_out_msgs();
bool is_client() const { return _is_client; }
::dsn::rpc_address remote_address() const { return _remote_addr; }
dsn::rpc_address remote_address() const { return _remote_addr; }
connection_oriented_network &net() const { return _net; }
message_parser_ptr parser() const { return _parser; }
DSN_API void send_message(message_ex *msg);
DSN_API bool cancel(message_ex *request);

void send_message(message_ex *msg);
bool cancel(message_ex *request);
void delay_recv(int delay_ms);
DSN_API bool on_recv_message(message_ex *msg, int delay_ms);
bool on_recv_message(message_ex *msg, int delay_ms);

// for client session
public:
// return true if the socket should be closed
DSN_API bool on_disconnected(bool is_write);
protected:
///
/// for send message
///
bool unlink_message_for_send();
virtual void send(uint64_t signature) = 0;
void on_send_completed(uint64_t signature = 0);

virtual void connect() = 0;
enum session_state
{
SS_CONNECTING,
SS_CONNECTED,
SS_DISCONNECTED
};
::dsn::utils::ex_lock_nr _lock; // [
volatile session_state _connect_state;

// for server session
public:
DSN_API void start_read_next(int read_next = 256);
// messages are sent in batch, firstly all messages are linked together
// in a doubly-linked list "_messages".
// if no messages are on-the-flying, a batch of messages are fetch from the "_messages"
// and put them to _sending_msgs; meanwhile, buffers of these messages are put
// in _sending_buffers
dlink _messages;
int _message_count; // count of _messages

bool _is_sending_next;

// should be called in do_read() before using _parser when it is nullptr.
// returns:
// -1 : prepare failed, maybe because of invalid message header type
// 0 : prepare succeed, _parser is not nullptr now.
// >0 : need read more data, returns read_next.
DSN_API int prepare_parser();
std::vector<message_ex *> _sending_msgs;
std::vector<message_parser::send_buf> _sending_buffers;

uint64_t _message_sent;
// ]

// shared
protected:
//
// sending messages are put in _sending_msgs
// buffer is prepared well in _sending_buffers
// always call on_send_completed later
//
virtual void send(uint64_t signature) = 0;
///
/// for receive message
///
void start_read_next(int read_next = 256);
int prepare_parser();
virtual void do_read(int read_next) = 0;

protected:
DSN_API bool try_connecting(); // return true when it is permitted
DSN_API void set_connected();
DSN_API bool set_disconnected(); // return true when it is permitted
///
/// change status and check status
///
// return true when it is permitted
bool set_connecting();
// return true when it is permitted
bool set_disconnected();
void set_connected();

bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; }
bool is_connecting() const { return _connect_state == SS_CONNECTING; }
bool is_connected() const { return _connect_state == SS_CONNECTED; }
DSN_API void on_send_completed(uint64_t signature = 0); // default value for nothing is sent

private:
// return whether there are messages for sending; should always be called in lock
DSN_API bool unlink_message_for_send();
DSN_API void clear_send_queue(bool resend_msgs);
// return whether there are messages for sending;
// should always be called in lock
void clear_send_queue(bool resend_msgs);

bool on_disconnected(bool is_write);

protected:
// constant info
connection_oriented_network &_net;
::dsn::rpc_address _remote_addr;
dsn::rpc_address _remote_addr;
int _max_buffer_block_count_per_send;
message_reader _reader;
message_parser_ptr _parser;

// messages are currently being sent
// also locked by _lock later
std::vector<message_parser::send_buf> _sending_buffers;
std::vector<message_ex *> _sending_msgs;

private:
const bool _is_client;
rpc_client_matcher *_matcher;

enum session_state
{
SS_CONNECTING,
SS_CONNECTED,
SS_DISCONNECTED
};

// TODO: expose the queue to be customizable
::dsn::utils::ex_lock_nr _lock; // [
volatile bool _is_sending_next;
int _message_count; // count of _messages
dlink _messages;
volatile session_state _connect_state;
uint64_t _message_sent;
// ]

std::atomic_int _delay_server_receive_ms;
};

Expand Down
10 changes: 2 additions & 8 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rpc_session::~rpc_session()
}
}

bool rpc_session::try_connecting()
bool rpc_session::set_connecting()
{
dassert(is_client(), "must be client session");

Expand Down Expand Up @@ -338,12 +338,6 @@ void rpc_session::on_send_completed(uint64_t signature)
this->send(sig);
}

bool rpc_session::has_pending_out_msgs()
{
utils::auto_lock<utils::ex_lock_nr> l(_lock);
return !_messages.is_alone();
}

rpc_session::rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
Expand Down Expand Up @@ -546,7 +540,7 @@ void connection_oriented_network::inject_drop_message(message_ex *msg, bool is_s
}

if (s != nullptr) {
s->close_on_fault_injection();
s->close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void asio_rpc_session::safe_close()

void asio_rpc_session::connect()
{
if (try_connecting()) {
if (set_connecting()) {
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4(_remote_addr.ip()),
_remote_addr.port());

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class asio_rpc_session : public rpc_session
bool is_client);
virtual ~asio_rpc_session();
virtual void send(uint64_t signature) override { return write(signature); }
virtual void close_on_fault_injection() override { safe_close(); }
virtual void close() override { safe_close(); }

public:
virtual void connect() override;
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ sim_client_session::sim_client_session(sim_network_provider &net,

void sim_client_session::connect()
{
if (try_connecting())
if (set_connecting())
set_connected();
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/network.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class sim_client_session : public rpc_session

virtual void do_read(int sz) override {}

virtual void close_on_fault_injection() override {}
virtual void close() override {}
};

class sim_server_session : public rpc_session
Expand All @@ -71,7 +71,7 @@ class sim_server_session : public rpc_session

virtual void do_read(int sz) override {}

virtual void close_on_fault_injection() override {}
virtual void close() override {}

private:
rpc_session_ptr _client;
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.bsd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

dassert(_socket != -1, "invalid given socket handle");
Expand Down
7 changes: 1 addition & 6 deletions src/core/tools/hpc/hpc_network_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ class hpc_network_provider : public connection_oriented_network

class hpc_rpc_session : public rpc_session
{
// client
public:
virtual void connect() override;
virtual void close() override;

// server
public:
// shared
public:
hpc_rpc_session(socket_t sock,
message_parser_ptr &parser,
Expand All @@ -114,8 +111,6 @@ class hpc_rpc_session : public rpc_session
#endif
}

virtual void close_on_fault_injection() override { close(); }

void bind_looper(io_looper *looper, bool delay = false);
virtual void do_read(int read_next) override;

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

dassert(_socket != -1, "invalid given socket handle");
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

_connect_event.callback = [this](int err, uint32_t io_size, uintptr_t lpolp) {
Expand Down