Skip to content

Commit

Permalink
feat: overload dump_write_request of replication_app_base.h (#533)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored May 19, 2020
1 parent 21c9c54 commit f8ef62e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 38 deletions.
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 29 files
+6 −5 include/dsn/cpp/rpc_holder.h
+2 −0 include/dsn/dist/replication/replication.codes.h
+3 −0 include/dsn/dist/replication/replication_app_base.h
+624 −0 include/dsn/dist/replication/replication_types.h
+163 −53 src/dist/block_service/fds/fds_service.cpp
+24 −5 src/dist/block_service/fds/fds_service.h
+1 −1 src/dist/block_service/local/local_service.cpp
+1 −1 src/dist/replication/common/replication_common.cpp
+3 −0 src/dist/replication/common/replication_common.h
+1,458 −0 src/dist/replication/common/replication_types.cpp
+26 −0 src/dist/replication/lib/replica.h
+4 −1 src/dist/replication/lib/replica_2pc.cpp
+212 −0 src/dist/replication/lib/replica_bulk_load.cpp
+7 −25 src/dist/replication/lib/replica_context.cpp
+39 −0 src/dist/replication/lib/replica_context.h
+11 −14 src/dist/replication/lib/replica_restore.cpp
+34 −0 src/dist/replication/lib/replica_stub.cpp
+4 −0 src/dist/replication/lib/replica_stub.h
+293 −0 src/dist/replication/meta_server/meta_bulk_load_service.cpp
+103 −1 src/dist/replication/meta_server/meta_bulk_load_service.h
+18 −0 src/dist/replication/meta_server/meta_service.cpp
+4 −0 src/dist/replication/meta_server/meta_service.h
+1 −0 src/dist/replication/meta_server/server_state.h
+99 −0 src/dist/replication/replication.thrift
+1 −1 src/dist/replication/test/meta_test/unit_test/CMakeLists.txt
+106 −0 src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp
+148 −0 src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp
+2 −2 thirdparty/build-thirdparty.sh
+3 −3 thirdparty/download-thirdparty.sh
52 changes: 52 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2618,5 +2618,57 @@ void pegasus_server_impl::release_db()
_db = nullptr;
}

std::string pegasus_server_impl::dump_write_request(dsn::message_ex *request)
{
dsn::task_code rpc_code(request->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto put = put_rpc::auto_reply(request).request();
::dsn::blob hash_key, sort_key;
pegasus_restore_key(put.key, hash_key, sort_key);
std::string request("put:");
request.append("hash_key=")
.append(pegasus::utils::c_escape_string(hash_key))
.append(",sort_key=")
.append(pegasus::utils::c_escape_string(sort_key));
return request;
}

if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
auto multi_put = multi_put_rpc::auto_reply(request).request();
std::string request("multi_put:");
request.append("hash_key=")
.append(pegasus::utils::c_escape_string((multi_put.hash_key))
.append(",multi_put_count=")
.append(std::to_string(multi_put.kvs.size())));
return request;
}

if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
auto check_and_set = check_and_set_rpc::auto_reply(request).request();
std::string request("check_and_set:");
request.append("hash_key=")
.append(pegasus::utils::c_escape_string(check_and_set.hash_key))
.append(",check_sort_key=")
.append(pegasus::utils::c_escape_string(check_and_set.check_sort_key))
.append(",set_sort_key=")
.append(pegasus::utils::c_escape_string(check_and_set.set_sort_key));
return request;
}

if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
auto check_and_mutate = check_and_mutate_rpc::auto_reply(request).request();
std::string request("check_and_mutate:");
request.append("hash_key=")
.append(pegasus::utils::c_escape_string(check_and_mutate.hash_key))
.append(",check_sort_key=")
.append(pegasus::utils::c_escape_string(check_and_mutate.check_sort_key))
.append(",set_value_count=")
.append(std::to_string(check_and_mutate.mutate_list.size()));
return request;
}

return "default";
}

} // namespace server
} // namespace pegasus
75 changes: 38 additions & 37 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
}
explicit pegasus_server_impl(dsn::replication::replica *r);

virtual ~pegasus_server_impl() override;
~pegasus_server_impl() override;

// the following methods may set physical error if internal error occurs
virtual void on_get(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::read_response> &reply) override;
virtual void on_multi_get(const ::dsn::apps::multi_get_request &args,
::dsn::rpc_replier<::dsn::apps::multi_get_response> &reply) override;
virtual void on_sortkey_count(const ::dsn::blob &args,
::dsn::rpc_replier<::dsn::apps::count_response> &reply) override;
virtual void on_ttl(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::ttl_response> &reply) override;
virtual void on_get_scanner(const ::dsn::apps::get_scanner_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
virtual void on_scan(const ::dsn::apps::scan_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
virtual void on_clear_scanner(const int64_t &args) override;
void on_get(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::read_response> &reply) override;
void on_multi_get(const ::dsn::apps::multi_get_request &args,
::dsn::rpc_replier<::dsn::apps::multi_get_response> &reply) override;
void on_sortkey_count(const ::dsn::blob &args,
::dsn::rpc_replier<::dsn::apps::count_response> &reply) override;
void on_ttl(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::ttl_response> &reply) override;
void on_get_scanner(const ::dsn::apps::get_scanner_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
void on_scan(const ::dsn::apps::scan_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
void on_clear_scanner(const int64_t &args) override;

// input:
// - argc = 0 : re-open the db
Expand All @@ -63,14 +63,14 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// - ERR_OK
// - ERR_FILE_OPERATION_FAILED
// - ERR_LOCAL_APP_FAILURE
virtual ::dsn::error_code start(int argc, char **argv) override;
::dsn::error_code start(int argc, char **argv) override;

virtual void cancel_background_work(bool wait) override;
void cancel_background_work(bool wait) override;

// returns:
// - ERR_OK
// - ERR_FILE_OPERATION_FAILED
virtual ::dsn::error_code stop(bool clear_state) override;
::dsn::error_code stop(bool clear_state) override;

/// Each of the write request (specifically, the rpc that's configured as write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will first be
Expand All @@ -79,12 +79,12 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
///
/// \see dsn::replication::replication_app_base::apply_mutation
/// \inherit dsn::replication::replication_app_base
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
int count) override;
int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
int count) override;

virtual ::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
{
return ::dsn::ERR_OK;
}
Expand All @@ -95,15 +95,15 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
// ATTENTION: make sure that no other threads is writing into the replica.
virtual ::dsn::error_code sync_checkpoint() override;
::dsn::error_code sync_checkpoint() override;

// returns:
// - ERR_OK: checkpoint succeed
// - ERR_WRONG_TIMING: another checkpoint is running now
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
// - ERR_TRY_AGAIN: flush memtable triggered, need try again later
virtual ::dsn::error_code async_checkpoint(bool flush_memtable) override;
::dsn::error_code async_checkpoint(bool flush_memtable) override;

//
// copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint
Expand All @@ -112,8 +112,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
//
// must be thread safe
// this method will not trigger flush(), just copy even if the app is empty.
virtual ::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override;
::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override;

//
// help function, just copy checkpoint to specified dir and ignore _is_checkpointing.
Expand All @@ -130,9 +130,9 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// - ERR_OK
// - ERR_OBJECT_NOT_FOUND
// - ERR_FILE_OPERATION_FAILED
virtual ::dsn::error_code get_checkpoint(int64_t learn_start,
const dsn::blob &learn_request,
dsn::replication::learn_state &state) override;
::dsn::error_code get_checkpoint(int64_t learn_start,
const dsn::blob &learn_request,
dsn::replication::learn_state &state) override;

// apply checkpoint, this will clear and recreate the db
// if succeed:
Expand All @@ -143,19 +143,20 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// - error code of close()
// - error code of open()
// - error code of checkpoint()
virtual ::dsn::error_code
storage_apply_checkpoint(chkpt_apply_mode mode,
const dsn::replication::learn_state &state) override;
::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode,
const dsn::replication::learn_state &state) override;

virtual int64_t last_durable_decree() const override { return _last_durable_decree.load(); }
int64_t last_durable_decree() const override { return _last_durable_decree.load(); }

virtual int64_t last_flushed_decree() const override;
int64_t last_flushed_decree() const override;

virtual void update_app_envs(const std::map<std::string, std::string> &envs) override;
void update_app_envs(const std::map<std::string, std::string> &envs) override;

virtual void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) override;
void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) override;

virtual void set_partition_version(int32_t partition_version) override;
void set_partition_version(int32_t partition_version) override;

std::string dump_write_request(dsn::message_ex *request) override;

private:
friend class manual_compact_service_test;
Expand Down

0 comments on commit f8ef62e

Please sign in to comment.