Skip to content

Commit

Permalink
pegasus_server: refine write process (apache#110)
Browse files Browse the repository at this point in the history
Former-commit-id: 425e095d7242972e7b2caf4c26b01625bacd291a [formerly 8ee7e1d]
Former-commit-id: 2425bc92bd82dca8a053b80b3503dedde9ea70d1
  • Loading branch information
qinzuoyan authored Jul 9, 2018
1 parent 8b73e4c commit 109abc8
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/base/counter_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ struct perf_counter_info
DEFINE_JSON_SERIALIZATION(result, timestamp, timestamp_str, counters)
};

} // namespace
} // namespace pegasus
2 changes: 1 addition & 1 deletion src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_lev
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");

} // namespace
} // namespace pegasus
2 changes: 1 addition & 1 deletion src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;

} // namespace
} // namespace pegasus
2 changes: 1 addition & 1 deletion src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
}
}

} // namespace
} // namespace pegasus
5 changes: 3 additions & 2 deletions src/base/pegasus_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,6 @@ int c_unescape_string(const std::string &src, std::string &dest)
dest.resize(len);
return len;
}
}
} // namespace

} // namespace utils
} // namespace pegasus
4 changes: 2 additions & 2 deletions src/base/pegasus_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ inline dsn::string_view to_string_view(rocksdb::Slice s) { return {s.data(), s.s

inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s.size()}; }

}
} // namespace
} // namespace utils
} // namespace pegasus
36 changes: 24 additions & 12 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,46 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
// rocksdb's `last_flushed_decree` (see rocksdb::DB::GetLastFlushedDecree())
// TODO(wutao1): remove it when shared log is removed.
if (count == 0) {
return _write_svc->empty_put(decree);
return _write_svc->empty_put(_decree);
}

dsn::task_code rpc_code(dsn_msg_task_code(requests[0]));
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
return on_multi_put(rpc);
return _write_svc->multi_put(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
return on_multi_remove(rpc);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}

return on_batched_writes(requests, count, decree);
return on_batched_writes(requests, count);
}

int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count, int64_t decree)
int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count)
{
int err = 0;
{
_write_svc->batch_prepare();
_write_svc->batch_prepare(_decree);

for (int i = 0; i < count; ++i) {
dassert(requests[i] != nullptr, "request[%d] is null", i);

// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.

int local_err = 0;
dsn::task_code rpc_code(dsn_msg_task_code(requests[i]));
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
err = on_single_put_in_batch(rpc);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
err = on_single_remove_in_batch(rpc);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
Expand All @@ -74,10 +79,17 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count,
dfatal("rpc code not handled: %s", rpc_code.to_string());
}
}
RETURN_NOT_ZERO(err);

if (!err && local_err) {
err = local_err;
}
}

err = _write_svc->batch_commit(decree);
if (err == 0) {
err = _write_svc->batch_commit(_decree);
} else {
_write_svc->batch_abort(_decree, err);
}
}

// reply the batched RPCs
Expand All @@ -101,8 +113,8 @@ void pegasus_server_write::request_key_check(int64_t decree, dsn_message_t m, co
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);

ddebug_rocksdb("write",
"decree={}, code={}, hash_key={}, sort_key={}",
ddebug_rocksdb("Write",
"decree: {}, code: {}, hash_key: {}, sort_key: {}",
decree,
msg->local_rpc_code.to_string(),
utils::c_escape_string(hash_key),
Expand Down
16 changes: 3 additions & 13 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,19 @@ class pegasus_server_write : public dsn::replication::replica_base
uint64_t timestamp);

private:
int on_multi_put(multi_put_rpc &rpc)
{
return _write_svc->multi_put(_decree, rpc.request(), rpc.response());
}

int on_multi_remove(multi_remove_rpc &rpc)
{
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}

/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn_message_t *requests, int count, int64_t decree);
int on_batched_writes(dsn_message_t *requests, int count);

int on_single_put_in_batch(put_rpc &rpc)
{
int err = _write_svc->batch_put(rpc.request(), rpc.response());
int err = _write_svc->batch_put(_decree, rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request().key);
return err;
}

int on_single_remove_in_batch(remove_rpc &rpc)
{
int err = _write_svc->batch_remove(rpc.request(), rpc.response());
int err = _write_svc->batch_remove(_decree, rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request());
return err;
}
Expand Down
74 changes: 44 additions & 30 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
"statistic the latency of MULTI_REMOVE request");
}

pegasus_write_service::~pegasus_write_service() = default;
pegasus_write_service::~pegasus_write_service() {}

int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); }

int pegasus_write_service::multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
Expand All @@ -82,55 +84,67 @@ int pegasus_write_service::multi_remove(int64_t decree,
return err;
}

int pegasus_write_service::batch_put(const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
void pegasus_write_service::batch_prepare(int64_t decree)
{
_pfc_put_qps->increment();
_batch_perfcounters.push_back(_pfc_put_latency.get());
dassert(_batch_start_time == 0,
"batch_prepare and batch_commit/batch_abort must be called in pair");

return _impl->batch_put(update, resp);
_batch_start_time = dsn_now_ns();
}

int pegasus_write_service::batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
int pegasus_write_service::batch_put(int64_t decree,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
_pfc_remove_qps->increment();
_batch_perfcounters.push_back(_pfc_remove_latency.get());
dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");

_batch_qps_perfcounters.push_back(_pfc_put_qps.get());
_batch_latency_perfcounters.push_back(_pfc_put_latency.get());

return _impl->batch_remove(key, resp);
return _impl->batch_put(decree, update, resp);
}

int pegasus_write_service::batch_commit(int64_t decree)
int pegasus_write_service::batch_remove(int64_t decree,
const dsn::blob &key,
dsn::apps::update_response &resp)
{
dassert(_batch_start_time != 0, "batch_commit and batch_prepare must be called in pair");
dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare");

int ret = _impl->batch_commit(decree);
_batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
_batch_latency_perfcounters.push_back(_pfc_remove_latency.get());

uint64_t latency = dsn_now_ns() - _batch_start_time;
for (dsn::perf_counter *pfc : _batch_perfcounters) {
pfc->set(latency);
}
return _impl->batch_remove(decree, key, resp);
}

_batch_perfcounters.clear();
_batch_start_time = 0;
int pegasus_write_service::batch_commit(int64_t decree)
{
dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare");

return ret;
int err = _impl->batch_commit(decree);
clear_up_batch_states();
return err;
}

void pegasus_write_service::batch_prepare()
void pegasus_write_service::batch_abort(int64_t decree, int err)
{
dassert(_batch_start_time == 0, "batch_commit and batch_prepare must be called in pair");
dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare");
dassert(err, "must abort on non-zero err");

_batch_start_time = dsn_now_ns();
_impl->batch_abort(decree, err);
clear_up_batch_states();
}

int pegasus_write_service::empty_put(int64_t decree)
void pegasus_write_service::clear_up_batch_states()
{
std::string empty_key, empty_value;
int err = _impl->db_write_batch_put(empty_key, empty_value, 0);
if (!err) {
err = _impl->db_write(decree);
}
return err;
uint64_t latency = dsn_now_ns() - _batch_start_time;
for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
pfc->increment();
for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
pfc->set(latency);

_batch_qps_perfcounters.clear();
_batch_latency_perfcounters.clear();
_batch_start_time = 0;
}

} // namespace server
Expand Down
58 changes: 37 additions & 21 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ namespace server {

class pegasus_server_impl;

#define RETURN_NOT_ZERO(err) \
if (dsn_unlikely(err)) \
return err;

/// Handle the write requests.
/// As the signatures imply, this class is not responsible for replying the rpc,
/// the caller(pegasus_server_write) should do.
Expand All @@ -31,36 +27,52 @@ class pegasus_write_service

~pegasus_write_service();

// Write empty record.
// See this document (https://github.com/XiaoMi/pegasus/wiki/last_flushed_decree)
// to know why we must have empty write.
int empty_put(int64_t decree);

// Write MULTI_PUT record.
int multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp);

// Write MULTI_REMOVE record.
int multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);

/// Prepare for batch write.
void batch_prepare();
/// For batch write.
/// NOTE: A batch write may incur a database read for consistency check of timetag.
/// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag)
/// To disable the consistency check, unset `verify_timetag` under `pegasus.server` section
/// in configuration.

// NOTE: A batch write may incur a database read for consistency check of timetag.
// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag)
// To disable the consistency check, unset `verify_timetag` under `pegasus.server` section
// in configuration.
// Prepare batch write.
void batch_prepare(int64_t decree);

/// NOTE that `resp` should not be moved or freed while
/// the batch is not committed.
int batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp);
// Add PUT record in batch write.
// \returns 0 if success, non-0 if failure.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_put(int64_t decree,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);

int batch_remove(const dsn::blob &key, dsn::apps::update_response &resp);
// Add REMOVE record in batch write.
// \returns 0 if success, non-0 if failure.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);

/// \returns 0 if success, non-0 if failure.
/// If the batch contains no updates, 0 is returned.
// Commit batch write.
// \returns 0 if success, non-0 if failure.
// NOTE that if the batch contains no updates, 0 is returned.
int batch_commit(int64_t decree);

/// Write empty record.
/// See this document (https://github.com/XiaoMi/pegasus/wiki/last_flushed_decree)
/// to know why we must have empty write.
int empty_put(int64_t decree);
// Abort batch write.
void batch_abort(int64_t decree, int err);

private:
void clear_up_batch_states();

private:
friend class pegasus_write_service_test;
Expand All @@ -80,7 +92,11 @@ class pegasus_write_service
::dsn::perf_counter_wrapper _pfc_remove_latency;
::dsn::perf_counter_wrapper _pfc_multi_remove_latency;

std::vector<::dsn::perf_counter *> _batch_perfcounters;
// Records all requests.
std::vector<::dsn::perf_counter *> _batch_qps_perfcounters;
std::vector<::dsn::perf_counter *> _batch_latency_perfcounters;

// TODO(wutao1): add perf counters for failed rpc.
};

} // namespace server
Expand Down
Loading

0 comments on commit 109abc8

Please sign in to comment.