From 109abc8b93eeae0b1f14518c1d47b82e4ade10b1 Mon Sep 17 00:00:00 2001 From: QinZuoyan Date: Mon, 9 Jul 2018 15:36:59 +0800 Subject: [PATCH] pegasus_server: refine write process (#110) Former-commit-id: 425e095d7242972e7b2caf4c26b01625bacd291a [formerly 8ee7e1da8eac309843c7e925201c1004fa6a64f8] Former-commit-id: 2425bc92bd82dca8a053b80b3503dedde9ea70d1 --- src/base/counter_utils.h | 2 +- src/base/pegasus_const.cpp | 2 +- src/base/pegasus_const.h | 2 +- src/base/pegasus_key_schema.h | 2 +- src/base/pegasus_utils.cpp | 5 +- src/base/pegasus_utils.h | 4 +- src/server/pegasus_server_write.cpp | 36 +++-- src/server/pegasus_server_write.h | 16 +-- src/server/pegasus_write_service.cpp | 74 ++++++---- src/server/pegasus_write_service.h | 58 +++++--- src/server/pegasus_write_service_impl.h | 128 ++++++++++++------ .../test/pegasus_write_service_test.cpp | 12 +- 12 files changed, 210 insertions(+), 131 deletions(-) diff --git a/src/base/counter_utils.h b/src/base/counter_utils.h index badc12a5cc..89a9fc116e 100644 --- a/src/base/counter_utils.h +++ b/src/base/counter_utils.h @@ -49,4 +49,4 @@ struct perf_counter_info DEFINE_JSON_SERIALIZATION(result, timestamp, timestamp_str, counters) }; -} // namespace +} // namespace pegasus diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index 4ce74a2c1a..0b7e22458b 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -53,4 +53,4 @@ const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_lev const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force"); const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip"); -} // namespace +} // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h index 835468edd7..ca61668a11 100644 --- a/src/base/pegasus_const.h +++ b/src/base/pegasus_const.h @@ -36,4 +36,4 @@ extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY; extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE; extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP; -} // namespace +} // namespace pegasus diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index c1bfe7e807..3222fcd0de 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -150,4 +150,4 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key) } } -} // namespace +} // namespace pegasus diff --git a/src/base/pegasus_utils.cpp b/src/base/pegasus_utils.cpp index 394d86ad27..17a1141590 100644 --- a/src/base/pegasus_utils.cpp +++ b/src/base/pegasus_utils.cpp @@ -214,5 +214,6 @@ int c_unescape_string(const std::string &src, std::string &dest) dest.resize(len); return len; } -} -} // namespace + +} // namespace utils +} // namespace pegasus diff --git a/src/base/pegasus_utils.h b/src/base/pegasus_utils.h index 22ca84d3ea..87e83ac5f9 100644 --- a/src/base/pegasus_utils.h +++ b/src/base/pegasus_utils.h @@ -83,5 +83,5 @@ inline dsn::string_view to_string_view(rocksdb::Slice s) { return {s.data(), s.s inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s.size()}; } -} -} // namespace +} // namespace utils +} // namespace pegasus diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index f70aaace23..a7405fe741 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -30,41 +30,46 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests, // rocksdb's `last_flushed_decree` (see rocksdb::DB::GetLastFlushedDecree()) // TODO(wutao1): remove it when shared log is removed. if (count == 0) { - return _write_svc->empty_put(decree); + return _write_svc->empty_put(_decree); } dsn::task_code rpc_code(dsn_msg_task_code(requests[0])); if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { dassert(count == 1, "count = %d", count); auto rpc = multi_put_rpc::auto_reply(requests[0]); - return on_multi_put(rpc); + return _write_svc->multi_put(_decree, rpc.request(), rpc.response()); } if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { dassert(count == 1, "count = %d", count); auto rpc = multi_remove_rpc::auto_reply(requests[0]); - return on_multi_remove(rpc); + return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); } - return on_batched_writes(requests, count, decree); + return on_batched_writes(requests, count); } -int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count, int64_t decree) +int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count) { int err = 0; { - _write_svc->batch_prepare(); + _write_svc->batch_prepare(_decree); for (int i = 0; i < count; ++i) { dassert(requests[i] != nullptr, "request[%d] is null", i); + // Make sure all writes are batched even if they are failed, + // since we need to record the total qps and rpc latencies, + // and respond for all RPCs regardless of their result. + + int local_err = 0; dsn::task_code rpc_code(dsn_msg_task_code(requests[i])); if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) { auto rpc = put_rpc::auto_reply(requests[i]); - err = on_single_put_in_batch(rpc); + local_err = on_single_put_in_batch(rpc); _put_rpc_batch.emplace_back(std::move(rpc)); } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { auto rpc = remove_rpc::auto_reply(requests[i]); - err = on_single_remove_in_batch(rpc); + local_err = on_single_remove_in_batch(rpc); _remove_rpc_batch.emplace_back(std::move(rpc)); } else { if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT || @@ -74,10 +79,17 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count, dfatal("rpc code not handled: %s", rpc_code.to_string()); } } - RETURN_NOT_ZERO(err); + + if (!err && local_err) { + err = local_err; + } } - err = _write_svc->batch_commit(decree); + if (err == 0) { + err = _write_svc->batch_commit(_decree); + } else { + _write_svc->batch_abort(_decree, err); + } } // reply the batched RPCs @@ -101,8 +113,8 @@ void pegasus_server_write::request_key_check(int64_t decree, dsn_message_t m, co ::dsn::blob hash_key, sort_key; pegasus_restore_key(key, hash_key, sort_key); - ddebug_rocksdb("write", - "decree={}, code={}, hash_key={}, sort_key={}", + ddebug_rocksdb("Write", + "decree: {}, code: {}, hash_key: {}, sort_key: {}", decree, msg->local_rpc_code.to_string(), utils::c_escape_string(hash_key), diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 65908ca4d3..0648031d98 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -29,29 +29,19 @@ class pegasus_server_write : public dsn::replication::replica_base uint64_t timestamp); private: - int on_multi_put(multi_put_rpc &rpc) - { - return _write_svc->multi_put(_decree, rpc.request(), rpc.response()); - } - - int on_multi_remove(multi_remove_rpc &rpc) - { - return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); - } - /// Delay replying for the batched requests until all of them complete. - int on_batched_writes(dsn_message_t *requests, int count, int64_t decree); + int on_batched_writes(dsn_message_t *requests, int count); int on_single_put_in_batch(put_rpc &rpc) { - int err = _write_svc->batch_put(rpc.request(), rpc.response()); + int err = _write_svc->batch_put(_decree, rpc.request(), rpc.response()); request_key_check(_decree, rpc.dsn_request(), rpc.request().key); return err; } int on_single_remove_in_batch(remove_rpc &rpc) { - int err = _write_svc->batch_remove(rpc.request(), rpc.response()); + int err = _write_svc->batch_remove(_decree, rpc.request(), rpc.response()); request_key_check(_decree, rpc.dsn_request(), rpc.request()); return err; } diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index a59e27ff0d..ae7887e55e 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -58,7 +58,9 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) "statistic the latency of MULTI_REMOVE request"); } -pegasus_write_service::~pegasus_write_service() = default; +pegasus_write_service::~pegasus_write_service() {} + +int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); } int pegasus_write_service::multi_put(int64_t decree, const dsn::apps::multi_put_request &update, @@ -82,55 +84,67 @@ int pegasus_write_service::multi_remove(int64_t decree, return err; } -int pegasus_write_service::batch_put(const dsn::apps::update_request &update, - dsn::apps::update_response &resp) +void pegasus_write_service::batch_prepare(int64_t decree) { - _pfc_put_qps->increment(); - _batch_perfcounters.push_back(_pfc_put_latency.get()); + dassert(_batch_start_time == 0, + "batch_prepare and batch_commit/batch_abort must be called in pair"); - return _impl->batch_put(update, resp); + _batch_start_time = dsn_now_ns(); } -int pegasus_write_service::batch_remove(const dsn::blob &key, dsn::apps::update_response &resp) +int pegasus_write_service::batch_put(int64_t decree, + const dsn::apps::update_request &update, + dsn::apps::update_response &resp) { - _pfc_remove_qps->increment(); - _batch_perfcounters.push_back(_pfc_remove_latency.get()); + dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare"); + + _batch_qps_perfcounters.push_back(_pfc_put_qps.get()); + _batch_latency_perfcounters.push_back(_pfc_put_latency.get()); - return _impl->batch_remove(key, resp); + return _impl->batch_put(decree, update, resp); } -int pegasus_write_service::batch_commit(int64_t decree) +int pegasus_write_service::batch_remove(int64_t decree, + const dsn::blob &key, + dsn::apps::update_response &resp) { - dassert(_batch_start_time != 0, "batch_commit and batch_prepare must be called in pair"); + dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare"); - int ret = _impl->batch_commit(decree); + _batch_qps_perfcounters.push_back(_pfc_remove_qps.get()); + _batch_latency_perfcounters.push_back(_pfc_remove_latency.get()); - uint64_t latency = dsn_now_ns() - _batch_start_time; - for (dsn::perf_counter *pfc : _batch_perfcounters) { - pfc->set(latency); - } + return _impl->batch_remove(decree, key, resp); +} - _batch_perfcounters.clear(); - _batch_start_time = 0; +int pegasus_write_service::batch_commit(int64_t decree) +{ + dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare"); - return ret; + int err = _impl->batch_commit(decree); + clear_up_batch_states(); + return err; } -void pegasus_write_service::batch_prepare() +void pegasus_write_service::batch_abort(int64_t decree, int err) { - dassert(_batch_start_time == 0, "batch_commit and batch_prepare must be called in pair"); + dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare"); + dassert(err, "must abort on non-zero err"); - _batch_start_time = dsn_now_ns(); + _impl->batch_abort(decree, err); + clear_up_batch_states(); } -int pegasus_write_service::empty_put(int64_t decree) +void pegasus_write_service::clear_up_batch_states() { - std::string empty_key, empty_value; - int err = _impl->db_write_batch_put(empty_key, empty_value, 0); - if (!err) { - err = _impl->db_write(decree); - } - return err; + uint64_t latency = dsn_now_ns() - _batch_start_time; + for (dsn::perf_counter *pfc : _batch_qps_perfcounters) + pfc->increment(); + for (dsn::perf_counter *pfc : _batch_latency_perfcounters) + pfc->set(latency); + + _batch_qps_perfcounters.clear(); + _batch_latency_perfcounters.clear(); + _batch_start_time = 0; } } // namespace server diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index d50c76f328..48c4c7e642 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -16,10 +16,6 @@ namespace server { class pegasus_server_impl; -#define RETURN_NOT_ZERO(err) \ - if (dsn_unlikely(err)) \ - return err; - /// Handle the write requests. /// As the signatures imply, this class is not responsible for replying the rpc, /// the caller(pegasus_server_write) should do. @@ -31,36 +27,52 @@ class pegasus_write_service ~pegasus_write_service(); + // Write empty record. + // See this document (https://github.com/XiaoMi/pegasus/wiki/last_flushed_decree) + // to know why we must have empty write. + int empty_put(int64_t decree); + + // Write MULTI_PUT record. int multi_put(int64_t decree, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp); + // Write MULTI_REMOVE record. int multi_remove(int64_t decree, const dsn::apps::multi_remove_request &update, dsn::apps::multi_remove_response &resp); - /// Prepare for batch write. - void batch_prepare(); + /// For batch write. + /// NOTE: A batch write may incur a database read for consistency check of timetag. + /// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag) + /// To disable the consistency check, unset `verify_timetag` under `pegasus.server` section + /// in configuration. - // NOTE: A batch write may incur a database read for consistency check of timetag. - // (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag) - // To disable the consistency check, unset `verify_timetag` under `pegasus.server` section - // in configuration. + // Prepare batch write. + void batch_prepare(int64_t decree); - /// NOTE that `resp` should not be moved or freed while - /// the batch is not committed. - int batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp); + // Add PUT record in batch write. + // \returns 0 if success, non-0 if failure. + // NOTE that `resp` should not be moved or freed while the batch is not committed. + int batch_put(int64_t decree, + const dsn::apps::update_request &update, + dsn::apps::update_response &resp); - int batch_remove(const dsn::blob &key, dsn::apps::update_response &resp); + // Add REMOVE record in batch write. + // \returns 0 if success, non-0 if failure. + // NOTE that `resp` should not be moved or freed while the batch is not committed. + int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp); - /// \returns 0 if success, non-0 if failure. - /// If the batch contains no updates, 0 is returned. + // Commit batch write. + // \returns 0 if success, non-0 if failure. + // NOTE that if the batch contains no updates, 0 is returned. int batch_commit(int64_t decree); - /// Write empty record. - /// See this document (https://github.com/XiaoMi/pegasus/wiki/last_flushed_decree) - /// to know why we must have empty write. - int empty_put(int64_t decree); + // Abort batch write. + void batch_abort(int64_t decree, int err); + +private: + void clear_up_batch_states(); private: friend class pegasus_write_service_test; @@ -80,7 +92,11 @@ class pegasus_write_service ::dsn::perf_counter_wrapper _pfc_remove_latency; ::dsn::perf_counter_wrapper _pfc_multi_remove_latency; - std::vector<::dsn::perf_counter *> _batch_perfcounters; + // Records all requests. + std::vector<::dsn::perf_counter *> _batch_qps_perfcounters; + std::vector<::dsn::perf_counter *> _batch_latency_perfcounters; + + // TODO(wutao1): add perf counters for failed rpc. }; } // namespace server diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index befe20b6df..29c0fb3aad 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -25,6 +25,20 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { } + int empty_put(int64_t decree) + { + int err = db_write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0); + if (err) { + clear_up_batch_states(decree, err); + return err; + } + + err = db_write(decree); + + clear_up_batch_states(decree, err); + return err; + } + int multi_put(int64_t decree, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp) @@ -35,23 +49,27 @@ class pegasus_write_service::impl : public dsn::replication::replica_base resp.server = _primary_address; if (update.kvs.empty()) { - // invalid argument - derror_replica("invalid argument for multi_put: decree = {}, error = empty kvs", - decree); - - // an invalid operation shouldn't be added to latency calculation + derror_replica("invalid argument for multi_put: decree = {}, error = {}", + decree, + "request.kvs is empty"); resp.error = rocksdb::Status::kInvalidArgument; return 0; } for (auto &kv : update.kvs) { - resp.error = db_write_batch_put(composite_raw_key(update.hash_key, kv.key), + resp.error = db_write_batch_put(decree, + composite_raw_key(update.hash_key, kv.key), kv.value, static_cast(update.expire_ts_seconds)); - RETURN_NOT_ZERO(resp.error); + if (resp.error) { + clear_up_batch_states(decree, resp.error); + return resp.error; + } } resp.error = db_write(decree); + + clear_up_batch_states(decree, resp.error); return resp.error; } @@ -65,42 +83,46 @@ class pegasus_write_service::impl : public dsn::replication::replica_base resp.server = _primary_address; if (update.sort_keys.empty()) { - // invalid argument - derror_replica( - "invalid argument for multi_remove: decree = {}, error = empty sort keys", decree); - - // an invalid operation shouldn't be added to latency calculation + derror_replica("invalid argument for multi_remove: decree = {}, error = {}", + decree, + "request.sort_keys is empty"); resp.error = rocksdb::Status::kInvalidArgument; - resp.count = 0; return 0; } for (auto &sort_key : update.sort_keys) { - // TODO(wutao1): check returned error - resp.error = db_write_batch_delete(composite_raw_key(update.hash_key, sort_key)); - RETURN_NOT_ZERO(resp.error); + resp.error = + db_write_batch_delete(decree, composite_raw_key(update.hash_key, sort_key)); + if (resp.error) { + clear_up_batch_states(decree, resp.error); + return resp.error; + } } resp.error = db_write(decree); - if (resp.error != 0) { - resp.count = 0; - } else { + if (resp.error == 0) { resp.count = update.sort_keys.size(); } + + clear_up_batch_states(decree, resp.error); return resp.error; } - int batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp) + /// For batch write. + + int batch_put(int64_t decree, + const dsn::apps::update_request &update, + dsn::apps::update_response &resp) { resp.error = db_write_batch_put( - update.key, update.value, static_cast(update.expire_ts_seconds)); + decree, update.key, update.value, static_cast(update.expire_ts_seconds)); _update_responses.emplace_back(&resp); return resp.error; } - int batch_remove(const dsn::blob &key, dsn::apps::update_response &resp) + int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp) { - resp.error = db_write_batch_delete(key); + resp.error = db_write_batch_delete(decree, key); _update_responses.emplace_back(&resp); return resp.error; } @@ -108,21 +130,17 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int batch_commit(int64_t decree) { int err = db_write(decree); - - dsn::apps::update_response resp; - resp.app_id = get_gpid().get_app_id(); - resp.partition_index = get_gpid().get_partition_index(); - resp.decree = decree; - resp.server = _primary_address; - - for (dsn::apps::update_response *uresp : _update_responses) { - *uresp = resp; - } - _update_responses.clear(); + clear_up_batch_states(decree, err); return err; } - int db_write_batch_put(dsn::string_view raw_key, dsn::string_view value, uint32_t expire_sec) + void batch_abort(int64_t decree, int err) { clear_up_batch_states(decree, err); } + +private: + int db_write_batch_put(int64_t decree, + dsn::string_view raw_key, + dsn::string_view value, + uint32_t expire_sec) { rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key); rocksdb::SliceParts skey_parts(&skey, 1); @@ -130,21 +148,31 @@ class pegasus_write_service::impl : public dsn::replication::replica_base _value_generator.generate_value(_value_schema_version, value, expire_sec); rocksdb::Status s = _batch.Put(skey_parts, svalue); if (dsn_unlikely(!s.ok())) { + ::dsn::blob hash_key, sort_key; + pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); derror_rocksdb("WriteBatchPut", s.ToString(), - "raw_key: {}, expire_sec: {}", - utils::c_escape_string(raw_key), + "decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}", + decree, + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key), expire_sec); } return s.code(); } - int db_write_batch_delete(dsn::string_view raw_key) + int db_write_batch_delete(int64_t decree, dsn::string_view raw_key) { rocksdb::Status s = _batch.Delete(utils::to_rocksdb_slice(raw_key)); if (dsn_unlikely(!s.ok())) { - derror_rocksdb( - "WriteBatchDelete", s.ToString(), "raw_key: {}", utils::c_escape_string(raw_key)); + ::dsn::blob hash_key, sort_key; + pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("WriteBatchDelete", + s.ToString(), + "decree: {}, hash_key: {}, sort_key: {}", + decree, + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); } return s.code(); } @@ -161,11 +189,27 @@ class pegasus_write_service::impl : public dsn::replication::replica_base if (!status.ok()) { derror_rocksdb("Write", status.ToString(), "decree: {}", decree); } - _batch.Clear(); return status.code(); } -private: + void clear_up_batch_states(int64_t decree, int err) + { + if (!_update_responses.empty()) { + dsn::apps::update_response resp; + resp.error = err; + resp.app_id = get_gpid().get_app_id(); + resp.partition_index = get_gpid().get_partition_index(); + resp.decree = decree; + resp.server = _primary_address; + for (dsn::apps::update_response *uresp : _update_responses) { + *uresp = resp; + } + _update_responses.clear(); + } + + _batch.Clear(); + } + dsn::blob composite_raw_key(dsn::string_view hash_key, dsn::string_view sort_key) { dsn::blob raw_key; diff --git a/src/server/test/pegasus_write_service_test.cpp b/src/server/test/pegasus_write_service_test.cpp index 0d04756c20..54e6cc0097 100644 --- a/src/server/test/pegasus_write_service_test.cpp +++ b/src/server/test/pegasus_write_service_test.cpp @@ -33,8 +33,9 @@ class pegasus_write_service_test : public pegasus_server_test_base // alarm for empty request request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size()); - _write_svc->multi_put(decree, request, response); + int err = _write_svc->multi_put(decree, request, response); ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument); + ASSERT_EQ(err, 0); constexpr int kv_num = 100; std::string sort_key[kv_num]; @@ -68,8 +69,9 @@ class pegasus_write_service_test : public pegasus_server_test_base // alarm for empty request request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size()); - _write_svc->multi_remove(decree, request, response); + int err = _write_svc->multi_remove(decree, request, response); ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument); + ASSERT_EQ(err, 0); constexpr int kv_num = 100; std::string sort_key[kv_num]; @@ -110,14 +112,14 @@ class pegasus_write_service_test : public pegasus_server_test_base // of response may be changed due to capacity increase. std::array responses; { - _write_svc->batch_prepare(); + _write_svc->batch_prepare(decree); for (int i = 0; i < kv_num; i++) { dsn::apps::update_request req; req.key = key[i]; - _write_svc->batch_put(req, responses[i]); + _write_svc->batch_put(decree, req, responses[i]); } for (int i = 0; i < kv_num; i++) { - _write_svc->batch_remove(key[i], responses[i]); + _write_svc->batch_remove(decree, key[i], responses[i]); } _write_svc->batch_commit(decree); }