From bc858a3465e7d32678ccfb8b0442afc338657612 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 13 Jul 2020 10:42:05 +0800 Subject: [PATCH 1/4] feat: sync bulk load from remote storage --- .../meta_server/meta_bulk_load_service.cpp | 111 ++++-- .../meta_server/meta_bulk_load_service.h | 13 +- .../replication/meta_server/meta_service.cpp | 2 +- .../unit_test/meta_bulk_load_service_test.cpp | 322 ++++++++++++++++++ .../test/meta_test/unit_test/meta_test_base.h | 2 + .../replication/test/simple_kv/case-402.act | 2 +- 6 files changed, 415 insertions(+), 37 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 2d1187eec8..27af288263 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -16,18 +16,17 @@ bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string & _state = _meta_svc->get_server_state(); } -// ThreadPool: THREAD_POOL_META_STATE +// ThreadPool: THREAD_POOL_META_SERVER void bulk_load_service::initialize_bulk_load_service() { task_tracker tracker; - error_code err = ERR_OK; + _sync_bulk_load_storage = + make_unique(_meta_svc->get_remote_storage(), &tracker); - create_bulk_load_root_dir(err, tracker); + create_bulk_load_root_dir(); tracker.wait_outstanding_tasks(); - if (err == ERR_OK) { - try_to_continue_bulk_load(); - } + try_to_continue_bulk_load(); } // ThreadPool: THREAD_POOL_META_STATE @@ -1223,39 +1222,85 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc) } } -// ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker) +// ThreadPool: THREAD_POOL_META_SERVER +void bulk_load_service::create_bulk_load_root_dir() { blob value = blob(); - _meta_svc->get_remote_storage()->create_node( - _bulk_load_root, - LPC_META_CALLBACK, - [this, &err, &tracker](error_code ec) { - if (ERR_OK == ec || ERR_NODE_ALREADY_EXIST == ec) { - ddebug_f("create bulk load root({}) succeed", _bulk_load_root); - sync_apps_bulk_load_from_remote_stroage(err, tracker); - } else if (ERR_TIMEOUT == ec) { - dwarn_f("create bulk load root({}) failed, retry later", _bulk_load_root); - tasking::enqueue( - LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::create_bulk_load_root_dir, this, err, tracker), - 0, - std::chrono::seconds(1)); - } else { - err = ec; - dfatal_f( - "create bulk load root({}) failed, error={}", _bulk_load_root, ec.to_string()); + std::string path = _bulk_load_root; + _sync_bulk_load_storage->create_node(std::move(path), std::move(value), [this]() { + ddebug_f("create bulk load root({}) succeed", _bulk_load_root); + sync_apps_bulk_load_from_remote_stroage(); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::sync_apps_bulk_load_from_remote_stroage() +{ + std::string path = _bulk_load_root; + _sync_bulk_load_storage->get_children( + std::move(path), [this](bool flag, const std::vector &children) { + if (flag && children.size() > 0) { + ddebug_f("There are {} apps need to sync bulk load status", children.size()); + for (auto &elem : children) { + uint32_t app_id = boost::lexical_cast(elem); + ddebug_f("start to sync app({}) bulk load status", app_id); + do_sync_app_bulk_load(app_id); + } } - }, - value, - &tracker); + }); } -void bulk_load_service::sync_apps_bulk_load_from_remote_stroage(error_code &err, - task_tracker &tracker) +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::do_sync_app_bulk_load(int32_t app_id) { - // TODO(heyuchen): TBD + std::string app_path = get_app_bulk_load_path(app_id); + _sync_bulk_load_storage->get_data( + std::move(app_path), [this, app_id, app_path](const blob &value) { + app_bulk_load_info ainfo; + dsn::json::json_forwarder::decode(value, ainfo); + { + zauto_write_lock l(_lock); + _bulk_load_app_id.insert(app_id); + _app_bulk_load_info[app_id] = ainfo; + } + sync_partitions_bulk_load_from_remote_stroage(ainfo.app_id, ainfo.app_name); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::sync_partitions_bulk_load_from_remote_stroage(int32_t app_id, + const std::string &app_name) +{ + std::string app_path = get_app_bulk_load_path(app_id); + _sync_bulk_load_storage->get_children( + std::move(app_path), + [this, app_path, app_id, app_name](bool flag, const std::vector &children) { + ddebug_f("app(name={},app_id={}) has {} partition bulk load info to be synced", + app_name, + app_id, + children.size()); + for (const auto &child_pidx : children) { + uint32_t pidx = boost::lexical_cast(child_pidx); + std::string partition_path = get_partition_bulk_load_path(app_path, pidx); + do_sync_partition_bulk_load(gpid(app_id, pidx), app_name, partition_path); + } + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::do_sync_partition_bulk_load(const gpid &pid, + const std::string &app_name, + std::string &partition_path) +{ + _sync_bulk_load_storage->get_data( + std::move(partition_path), [this, pid, app_name, partition_path](const blob &value) { + partition_bulk_load_info pinfo; + dsn::json::json_forwarder::decode(value, pinfo); + { + zauto_write_lock l(_lock); + _partition_bulk_load_info[pid] = pinfo; + } + }); } void bulk_load_service::try_to_continue_bulk_load() diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 55b3c17a4c..20eb041773 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -209,9 +209,17 @@ class bulk_load_service /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch /// - void create_bulk_load_root_dir(error_code &err, task_tracker &tracker); + void create_bulk_load_root_dir(); - void sync_apps_bulk_load_from_remote_stroage(error_code &err, task_tracker &tracker); + void sync_apps_bulk_load_from_remote_stroage(); + + void do_sync_app_bulk_load(int32_t app_id); + + void sync_partitions_bulk_load_from_remote_stroage(int32_t app_id, const std::string &app_name); + + void do_sync_partition_bulk_load(const gpid &pid, + const std::string &app_name, + std::string &partition_path); /// /// try to continue bulk load according to states from remote stroage @@ -311,6 +319,7 @@ class bulk_load_service meta_service *_meta_svc; server_state *_state; + std::unique_ptr _sync_bulk_load_storage; zrwlock_nr &app_lock() const { return _state->_lock; } zrwlock_nr _lock; // bulk load states lock diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index b1c2a994a7..e216f918f0 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -290,7 +290,7 @@ void meta_service::start_service() if (_bulk_load_svc) { ddebug("start bulk load service"); - tasking::enqueue(LPC_META_STATE_NORMAL, tracker(), [this]() { + tasking::enqueue(LPC_META_CALLBACK, tracker(), [this]() { _bulk_load_svc->initialize_bulk_load_service(); }); } diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 772120576b..bc89d3d3e9 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -130,6 +130,164 @@ class bulk_load_service_test : public meta_test_base return bulk_svc()._apps_in_progress_count[app_id]; } + /// Used for bulk_load_failover_test + + void initialize_meta_server_with_mock_bulk_load( + const std::unordered_set &app_id_set, + const std::unordered_map &app_bulk_load_info_map, + const std::unordered_map> + &partition_bulk_load_info_map, + const std::vector &app_list) + { + // initialize meta service + auto meta_svc = new fake_receiver_meta_service(); + meta_svc->remote_storage_initialize(); + + // initialize server_state + auto state = meta_svc->_state; + state->initialize(meta_svc, meta_svc->_cluster_root + "/apps"); + _app_root = state->_apps_root; + meta_svc->_started = true; + _ms.reset(meta_svc); + + // initialize bulk load service + _ms->_bulk_load_svc = make_unique( + _ms.get(), meta_options::concat_path_unix_style(_ms->_cluster_root, "bulk_load")); + mock_bulk_load_on_remote_storage( + app_id_set, app_bulk_load_info_map, partition_bulk_load_info_map); + + // mock app + for (auto &info : app_list) { + mock_app_on_remote_stroage(info); + } + state->initialize_data_structure(); + + _ms->set_function_level(meta_function_level::fl_steady); + _ms->_failure_detector.reset(new meta_server_failure_detector(_ms.get())); + _ss = _ms->_state; + } + + void mock_bulk_load_on_remote_storage( + const std::unordered_set &app_id_set, + const std::unordered_map &app_bulk_load_info_map, + const std::unordered_map> + &partition_bulk_load_info_map) + { + std::string path = bulk_svc()._bulk_load_root; + blob value = blob(); + // create bulk_load_root + _ms->get_meta_storage()->create_node( + std::move(path), + std::move(value), + [this, &app_id_set, &app_bulk_load_info_map, &partition_bulk_load_info_map]() { + for (const auto app_id : app_id_set) { + auto app_iter = app_bulk_load_info_map.find(app_id); + auto partition_iter = partition_bulk_load_info_map.find(app_id); + + if (app_iter != app_bulk_load_info_map.end() && + partition_iter != partition_bulk_load_info_map.end()) { + mock_app_bulk_load_info_on_remote_stroage(app_iter->second, + partition_iter->second); + } + } + }); + wait_all(); + } + + void mock_app_bulk_load_info_on_remote_stroage( + const app_bulk_load_info &ainfo, + const std::unordered_map &partition_bulk_load_info_map) + { + std::string app_path = bulk_svc().get_app_bulk_load_path(ainfo.app_id); + blob value = json::json_forwarder::encode(ainfo); + // create app_bulk_load_info + _ms->get_meta_storage()->create_node( + std::move(app_path), + std::move(value), + [this, app_path, &ainfo, &partition_bulk_load_info_map]() { + ddebug_f("create app({}) app_id={} bulk load dir({}), bulk_load_status={}", + ainfo.app_name, + ainfo.app_id, + app_path, + dsn::enum_to_string(ainfo.status)); + for (const auto kv : partition_bulk_load_info_map) { + mock_partition_bulk_load_info_on_remote_stroage(gpid(ainfo.app_id, kv.first), + kv.second); + } + }); + } + + void mock_partition_bulk_load_info_on_remote_stroage(const gpid &pid, + const partition_bulk_load_info &pinfo) + { + std::string partition_path = bulk_svc().get_partition_bulk_load_path(pid); + blob value = json::json_forwarder::encode(pinfo); + _ms->get_meta_storage()->create_node( + std::move(partition_path), std::move(value), [this, partition_path, pid, &pinfo]() { + ddebug_f("create partition[{}] bulk load dir({}), bulk_load_status={}", + pid, + partition_path, + dsn::enum_to_string(pinfo.status)); + }); + } + + void mock_app_on_remote_stroage(const app_info &info) + { + static const char *lock_state = "lock"; + static const char *unlock_state = "unlock"; + std::string path = _app_root; + + _ms->get_meta_storage()->create_node( + std::move(path), blob(lock_state, 0, strlen(lock_state)), [this]() { + ddebug_f("create app root {}", _app_root); + }); + wait_all(); + + blob value = json::json_forwarder::encode(info); + _ms->get_meta_storage()->create_node( + _app_root + "/" + boost::lexical_cast(info.app_id), + std::move(value), + [this, &info]() { + ddebug_f("create app({}) app_id={}, dir succeed", info.app_name, info.app_id); + for (int i = 0; i < info.partition_count; ++i) { + partition_configuration config; + config.max_replica_count = 3; + config.pid = gpid(info.app_id, i); + config.ballot = BALLOT; + blob v = json::json_forwarder::encode(config); + _ms->get_meta_storage()->create_node( + _app_root + "/" + boost::lexical_cast(info.app_id) + "/" + + boost::lexical_cast(i), + std::move(v), + [info, i, this]() { + ddebug_f("create app({}), partition({}.{}) dir succeed", + info.app_name, + info.app_id, + i); + }); + } + }); + wait_all(); + + std::string app_root = _app_root; + _ms->get_meta_storage()->set_data( + std::move(app_root), blob(unlock_state, 0, strlen(unlock_state)), []() {}); + wait_all(); + } + + int32_t get_app_id_set_size() { return bulk_svc()._bulk_load_app_id.size(); } + + int32_t get_partition_bulk_load_info_size(int32_t app_id) + { + int count = 0; + for (const auto kv : bulk_svc()._partition_bulk_load_info) { + if (kv.first.get_app_id() == app_id) { + ++count; + } + } + return count; + } + public: int32_t APP_ID = 1; std::string APP_NAME = "bulk_load_test"; @@ -541,5 +699,169 @@ TEST_F(bulk_load_process_test, ingest_succeed) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING); } +class bulk_load_failover_test : public bulk_load_service_test +{ +public: + bulk_load_failover_test() {} + + void SetUp() + { + fail::setup(); + fail::cfg("meta_bulk_load_partition_bulk_load", "return()"); + fail::cfg("meta_bulk_load_partition_ingestion", "return()"); + } + + void TearDown() + { + clean_up(); + fail::teardown(); + bulk_load_service_test::TearDown(); + } + + void try_to_continue_bulk_load(bulk_load_status::type app_status, bool is_bulk_loading = true) + { + prepare_bulk_load_structures(SYNC_APP_ID, + SYNC_PARTITION_COUNT, + SYNC_APP_NAME, + app_status, + _pstatus_map, + is_bulk_loading); + initialize_meta_server_with_mock_bulk_load( + _app_id_set, _app_bulk_load_info_map, _partition_bulk_load_info_map, _app_info_list); + bulk_svc().initialize_bulk_load_service(); + wait_all(); + } + + void + prepare_bulk_load_structures(int32_t app_id, + int32_t partition_count, + std::string &app_name, + bulk_load_status::type app_status, + std::unordered_map &pstatus_map, + bool is_bulk_loading) + { + _app_id_set.insert(app_id); + mock_app_bulk_load_info(app_id, partition_count, app_name, app_status); + mock_partition_bulk_load_info(app_id, pstatus_map); + add_to_app_info_list(app_id, partition_count, app_name, is_bulk_loading); + } + + void mock_app_bulk_load_info(int32_t app_id, + int32_t partition_count, + std::string &app_name, + bulk_load_status::type status) + { + app_bulk_load_info ainfo; + ainfo.app_id = app_id; + ainfo.app_name = app_name; + ainfo.cluster_name = CLUSTER; + ainfo.file_provider_type = PROVIDER; + ainfo.partition_count = partition_count; + ainfo.status = status; + _app_bulk_load_info_map[app_id] = ainfo; + } + + void + mock_partition_bulk_load_info(int32_t app_id, + std::unordered_map &pstatus_map) + { + if (pstatus_map.size() <= 0) { + return; + } + std::unordered_map pinfo_map; + for (auto iter = pstatus_map.begin(); iter != pstatus_map.end(); ++iter) { + partition_bulk_load_info pinfo; + pinfo.status = iter->second; + pinfo_map[iter->first] = pinfo; + } + _partition_bulk_load_info_map[app_id] = pinfo_map; + } + + void add_to_app_info_list(int32_t app_id, + int32_t partition_count, + std::string &app_name, + bool is_bulk_loading) + { + app_info ainfo; + ainfo.app_id = app_id; + ainfo.app_name = app_name; + ainfo.app_type = "pegasus"; + ainfo.is_stateful = true; + ainfo.is_bulk_loading = is_bulk_loading; + ainfo.max_replica_count = 3; + ainfo.partition_count = partition_count; + ainfo.status = app_status::AS_AVAILABLE; + _app_info_list.emplace_back(ainfo); + } + + void mock_pstatus_map(bulk_load_status::type status, int32_t end_index, int32_t start_index = 0) + { + for (auto i = start_index; i <= end_index; ++i) { + _pstatus_map[i] = status; + } + } + + void clean_up() + { + _app_info_list.clear(); + _app_bulk_load_info_map.clear(); + _partition_bulk_load_info_map.clear(); + _pstatus_map.clear(); + _app_id_set.clear(); + } + + std::string SYNC_APP_NAME = "bulk_load_failover_table"; + int32_t SYNC_APP_ID = 2; + int32_t SYNC_PARTITION_COUNT = 4; + + std::vector _app_info_list; + std::unordered_set _app_id_set; + std::unordered_map _app_bulk_load_info_map; + std::unordered_map> + _partition_bulk_load_info_map; + std::unordered_map _pstatus_map; +}; + +TEST_F(bulk_load_failover_test, sync_bulk_load) +{ + fail::cfg("meta_try_to_continue_bulk_load", "return()"); + + // mock app downloading with partition[0~1] downloading + std::unordered_map partition_bulk_load_status_map; + partition_bulk_load_status_map[0] = bulk_load_status::BLS_DOWNLOADING; + partition_bulk_load_status_map[1] = bulk_load_status::BLS_DOWNLOADING; + prepare_bulk_load_structures(SYNC_APP_ID, + SYNC_PARTITION_COUNT, + SYNC_APP_NAME, + bulk_load_status::BLS_DOWNLOADING, + partition_bulk_load_status_map, + true); + + // mock app failed with no partition existed + partition_bulk_load_status_map.clear(); + partition_bulk_load_status_map[0] = bulk_load_status::BLS_FAILED; + prepare_bulk_load_structures(APP_ID, + PARTITION_COUNT, + APP_NAME, + bulk_load_status::type::BLS_FAILED, + partition_bulk_load_status_map, + true); + + initialize_meta_server_with_mock_bulk_load( + _app_id_set, _app_bulk_load_info_map, _partition_bulk_load_info_map, _app_info_list); + bulk_svc().initialize_bulk_load_service(); + wait_all(); + + ASSERT_EQ(get_app_id_set_size(), 2); + + ASSERT_TRUE(app_is_bulk_loading(SYNC_APP_NAME)); + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_partition_bulk_load_info_size(SYNC_APP_ID), 2); + + ASSERT_TRUE(app_is_bulk_loading(APP_NAME)); + ASSERT_EQ(get_app_bulk_load_status(APP_ID), bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_partition_bulk_load_info_size(APP_ID), 1); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h index 0be884bcc1..57d0f142b1 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h +++ b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h @@ -38,6 +38,7 @@ class meta_test_base : public testing::Test _ss = _ms->_state; _ss->initialize(_ms.get(), _ms->_cluster_root + "/apps"); + _app_root = _ss->_apps_root; _ms->_started = true; _ms->set_function_level(meta_function_level::fl_steady); @@ -140,6 +141,7 @@ class meta_test_base : public testing::Test std::shared_ptr _ss; std::unique_ptr _ms; + std::string _app_root; }; } // namespace replication diff --git a/src/dist/replication/test/simple_kv/case-402.act b/src/dist/replication/test/simple_kv/case-402.act index 39d6c106ea..85458a1b05 100644 --- a/src/dist/replication/test/simple_kv/case-402.act +++ b/src/dist/replication/test/simple_kv/case-402.act @@ -181,7 +181,7 @@ client:begin_write:id=169,key=k169,value=v169,timeout=0 inject:on_aio_call:node=r2,task_code=LPC_WRITE_REPLICATION_LOG_SHARED config:{4,r1,[r3]} -state:{{r1,pri,4,12},{r3,sec,4,12}} +state:{{r1,pri,4,20},{r3,sec,4,11}} set:disable_load_balance=0 From 62b02d651b11c0a0efc0ce4973906520ebe5f6ca Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 13 Jul 2020 11:00:12 +0800 Subject: [PATCH 2/4] fix spelling --- .../meta_server/meta_bulk_load_service.cpp | 12 ++++++------ .../replication/meta_server/meta_bulk_load_service.h | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 67a1dfb6c6..2682277402 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -486,7 +486,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons // if replica report metadata, update metadata on remote storage if (response.__isset.metadata && is_partition_metadata_not_updated(pid)) { - update_partition_metadata_on_remote_stroage(app_name, pid, response.metadata); + update_partition_metadata_on_remote_storage(app_name, pid, response.metadata); } // update download progress @@ -743,7 +743,7 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::update_partition_metadata_on_remote_stroage( +void bulk_load_service::update_partition_metadata_on_remote_storage( const std::string &app_name, const gpid &pid, const bulk_load_metadata &metadata) { zauto_read_lock l(_lock); @@ -1221,12 +1221,12 @@ void bulk_load_service::create_bulk_load_root_dir() std::string path = _bulk_load_root; _sync_bulk_load_storage->create_node(std::move(path), std::move(value), [this]() { ddebug_f("create bulk load root({}) succeed", _bulk_load_root); - sync_apps_bulk_load_from_remote_stroage(); + sync_apps_bulk_load_from_remote_storage(); }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::sync_apps_bulk_load_from_remote_stroage() +void bulk_load_service::sync_apps_bulk_load_from_remote_storage() { std::string path = _bulk_load_root; _sync_bulk_load_storage->get_children( @@ -1255,12 +1255,12 @@ void bulk_load_service::do_sync_app_bulk_load(int32_t app_id) _bulk_load_app_id.insert(app_id); _app_bulk_load_info[app_id] = ainfo; } - sync_partitions_bulk_load_from_remote_stroage(ainfo.app_id, ainfo.app_name); + sync_partitions_bulk_load_from_remote_storage(ainfo.app_id, ainfo.app_name); }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::sync_partitions_bulk_load_from_remote_stroage(int32_t app_id, +void bulk_load_service::sync_partitions_bulk_load_from_remote_storage(int32_t app_id, const std::string &app_name) { std::string app_path = get_app_bulk_load_path(app_id); diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 20eb041773..e4e3b450f9 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -164,7 +164,7 @@ class bulk_load_service // Called by `handle_app_downloading` // update partition bulk load metadata reported by replica server on remote storage - void update_partition_metadata_on_remote_stroage(const std::string &app_name, + void update_partition_metadata_on_remote_storage(const std::string &app_name, const gpid &pid, const bulk_load_metadata &metadata); @@ -211,18 +211,18 @@ class bulk_load_service /// void create_bulk_load_root_dir(); - void sync_apps_bulk_load_from_remote_stroage(); + void sync_apps_bulk_load_from_remote_storage(); void do_sync_app_bulk_load(int32_t app_id); - void sync_partitions_bulk_load_from_remote_stroage(int32_t app_id, const std::string &app_name); + void sync_partitions_bulk_load_from_remote_storage(int32_t app_id, const std::string &app_name); void do_sync_partition_bulk_load(const gpid &pid, const std::string &app_name, std::string &partition_path); /// - /// try to continue bulk load according to states from remote stroage + /// try to continue bulk load according to states from remote storage /// called when service initialized or meta server leader switch /// void try_to_continue_bulk_load(); @@ -241,7 +241,7 @@ class bulk_load_service return oss.str(); } - // get app_bulk_load_info path on remote stroage + // get app_bulk_load_info path on remote storage // <_bulk_load_root>/ inline std::string get_app_bulk_load_path(int32_t app_id) const { @@ -250,7 +250,7 @@ class bulk_load_service return oss.str(); } - // get partition_bulk_load_info path on remote stroage + // get partition_bulk_load_info path on remote storage // <_bulk_load_root>// inline std::string get_partition_bulk_load_path(const std::string &app_bulk_load_path, int partition_id) const From f6b3b9aba6f7d9ddd5b245534dce8340ddf6f504 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 13 Jul 2020 17:05:15 +0800 Subject: [PATCH 3/4] fix by review --- .../replication/meta_server/meta_bulk_load_service.cpp | 7 +++---- src/dist/replication/meta_server/meta_bulk_load_service.h | 2 ++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 2682277402..6d7e7f5571 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -19,12 +19,11 @@ bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string & // ThreadPool: THREAD_POOL_META_SERVER void bulk_load_service::initialize_bulk_load_service() { - task_tracker tracker; _sync_bulk_load_storage = - make_unique(_meta_svc->get_remote_storage(), &tracker); + make_unique(_meta_svc->get_remote_storage(), &_sync_tracker); create_bulk_load_root_dir(); - tracker.wait_outstanding_tasks(); + _sync_tracker.wait_outstanding_tasks(); try_to_continue_bulk_load(); } @@ -1233,7 +1232,7 @@ void bulk_load_service::sync_apps_bulk_load_from_remote_storage() std::move(path), [this](bool flag, const std::vector &children) { if (flag && children.size() > 0) { ddebug_f("There are {} apps need to sync bulk load status", children.size()); - for (auto &elem : children) { + for (const auto &elem : children) { uint32_t app_id = boost::lexical_cast(elem); ddebug_f("start to sync app({}) bulk load status", app_id); do_sync_app_bulk_load(app_id); diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index e4e3b450f9..0549dc5eab 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -319,7 +319,9 @@ class bulk_load_service meta_service *_meta_svc; server_state *_state; + std::unique_ptr _sync_bulk_load_storage; + task_tracker _sync_tracker; zrwlock_nr &app_lock() const { return _state->_lock; } zrwlock_nr _lock; // bulk load states lock From c669a98b7b1ebfefb1367f02c1866cecedbc90cd Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 14 Jul 2020 09:30:05 +0800 Subject: [PATCH 4/4] update by review --- src/meta/meta_bulk_load_service.cpp | 60 ++++++++++++++--------------- src/meta/meta_bulk_load_service.h | 10 ++--- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 6d7e7f5571..54e3ee1353 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -1220,12 +1220,12 @@ void bulk_load_service::create_bulk_load_root_dir() std::string path = _bulk_load_root; _sync_bulk_load_storage->create_node(std::move(path), std::move(value), [this]() { ddebug_f("create bulk load root({}) succeed", _bulk_load_root); - sync_apps_bulk_load_from_remote_storage(); + sync_apps_from_remote_storage(); }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::sync_apps_bulk_load_from_remote_storage() +void bulk_load_service::sync_apps_from_remote_storage() { std::string path = _bulk_load_root; _sync_bulk_load_storage->get_children( @@ -1233,34 +1233,33 @@ void bulk_load_service::sync_apps_bulk_load_from_remote_storage() if (flag && children.size() > 0) { ddebug_f("There are {} apps need to sync bulk load status", children.size()); for (const auto &elem : children) { - uint32_t app_id = boost::lexical_cast(elem); + int32_t app_id = boost::lexical_cast(elem); ddebug_f("start to sync app({}) bulk load status", app_id); - do_sync_app_bulk_load(app_id); + do_sync_app(app_id); } } }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::do_sync_app_bulk_load(int32_t app_id) +void bulk_load_service::do_sync_app(int32_t app_id) { std::string app_path = get_app_bulk_load_path(app_id); - _sync_bulk_load_storage->get_data( - std::move(app_path), [this, app_id, app_path](const blob &value) { - app_bulk_load_info ainfo; - dsn::json::json_forwarder::decode(value, ainfo); - { - zauto_write_lock l(_lock); - _bulk_load_app_id.insert(app_id); - _app_bulk_load_info[app_id] = ainfo; - } - sync_partitions_bulk_load_from_remote_storage(ainfo.app_id, ainfo.app_name); - }); + _sync_bulk_load_storage->get_data(std::move(app_path), [this, app_id](const blob &value) { + app_bulk_load_info ainfo; + dsn::json::json_forwarder::decode(value, ainfo); + { + zauto_write_lock l(_lock); + _bulk_load_app_id.insert(app_id); + _app_bulk_load_info[app_id] = ainfo; + } + sync_partitions_from_remote_storage(ainfo.app_id, ainfo.app_name); + }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::sync_partitions_bulk_load_from_remote_storage(int32_t app_id, - const std::string &app_name) +void bulk_load_service::sync_partitions_from_remote_storage(int32_t app_id, + const std::string &app_name) { std::string app_path = get_app_bulk_load_path(app_id); _sync_bulk_load_storage->get_children( @@ -1271,27 +1270,24 @@ void bulk_load_service::sync_partitions_bulk_load_from_remote_storage(int32_t ap app_id, children.size()); for (const auto &child_pidx : children) { - uint32_t pidx = boost::lexical_cast(child_pidx); + int32_t pidx = boost::lexical_cast(child_pidx); std::string partition_path = get_partition_bulk_load_path(app_path, pidx); - do_sync_partition_bulk_load(gpid(app_id, pidx), app_name, partition_path); + do_sync_partition(gpid(app_id, pidx), partition_path); } }); } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::do_sync_partition_bulk_load(const gpid &pid, - const std::string &app_name, - std::string &partition_path) +void bulk_load_service::do_sync_partition(const gpid &pid, std::string &partition_path) { - _sync_bulk_load_storage->get_data( - std::move(partition_path), [this, pid, app_name, partition_path](const blob &value) { - partition_bulk_load_info pinfo; - dsn::json::json_forwarder::decode(value, pinfo); - { - zauto_write_lock l(_lock); - _partition_bulk_load_info[pid] = pinfo; - } - }); + _sync_bulk_load_storage->get_data(std::move(partition_path), [this, pid](const blob &value) { + partition_bulk_load_info pinfo; + dsn::json::json_forwarder::decode(value, pinfo); + { + zauto_write_lock l(_lock); + _partition_bulk_load_info[pid] = pinfo; + } + }); } void bulk_load_service::try_to_continue_bulk_load() diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 1a471b4cbe..23fcfdac2f 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -211,15 +211,13 @@ class bulk_load_service /// void create_bulk_load_root_dir(); - void sync_apps_bulk_load_from_remote_storage(); + void sync_apps_from_remote_storage(); - void do_sync_app_bulk_load(int32_t app_id); + void do_sync_app(int32_t app_id); - void sync_partitions_bulk_load_from_remote_storage(int32_t app_id, const std::string &app_name); + void sync_partitions_from_remote_storage(int32_t app_id, const std::string &app_name); - void do_sync_partition_bulk_load(const gpid &pid, - const std::string &app_name, - std::string &partition_path); + void do_sync_partition(const gpid &pid, std::string &partition_path); /// /// try to continue bulk load according to states from remote storage