Skip to content

Commit

Permalink
[SKV-636] fix: Fix the corruption RocksDB instance will be reused bug (
Browse files Browse the repository at this point in the history
…apache#1422)

对应社区commit: https://github.com/apache/incubator-pegasus/pull/1422/files

其中,单测 integration_test.cpp 未添加,原因是整个function test的变更过大不便添加,等
最后都合入后再单独补充

apache#1383

This patch deal with the error `kCorruption` returned from storage engine of
write requests. After replica server got such an error, it will trash the
replica to a trash path `<app_id>.<pid>.pegasus.<timestamp>.err`.

Note that the replica server may crash because the corrupted replica has been
trashed and closed, it is left to be completed by another patches.
  • Loading branch information
acelyc111 authored and 王聃 committed May 5, 2023
1 parent b09d12f commit 44cc352
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 16 deletions.
4 changes: 3 additions & 1 deletion src/rdsn/include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ class replication_app_base : public replica_base

// Return code:
// - ERR_OK: everything is OK.
// - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors, i.e. kCorruption from
// storage engine.
// - ERR_LOCAL_APP_FAILURE: other type of errors.
error_code apply_mutation(const mutation *mu);
error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT;

// methods need to implement on storage engine side
virtual error_code start(int argc, char **argv) = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/rdsn/include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,6 @@ DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED)
DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED)
DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)

DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
} // namespace dsn
1 change: 1 addition & 0 deletions src/rdsn/src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
} // dsn
7 changes: 7 additions & 0 deletions src/rdsn/src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
// which is binded to this replication partition
//

#include <gtest/gtest_prod.h>

#include <dsn/tool-api/rpc_message.h>
#include <dsn/tool-api/uniq_timestamp_us.h>
#include <dsn/tool-api/task.h>
Expand Down Expand Up @@ -494,6 +496,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// use Apache Ranger for replica access control
bool access_controller_allowed(message_ex *msg, const ranger::access_type &ac_type) const;

bool is_data_corrupted() const { return _data_corrupted; }

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand All @@ -514,6 +518,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_disk_migrate_test;
friend class open_replica_test;
friend class replica_follower;
FRIEND_TEST(replica_test, test_auto_trash);

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -635,6 +640,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
disk_status::type _disk_status{disk_status::NORMAL};

bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
bool _data_corrupted{false};
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
Expand Down
4 changes: 4 additions & 0 deletions src/rdsn/src/replica/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ void replica::handle_local_failure(error_code error)
error.to_string(),
enum_to_string(status()));

if (error == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

if (status() == partition_status::PS_PRIMARY) {
_stub->remove_replica_on_meta_server(_app_info, _primary_states.membership);
}
Expand Down
10 changes: 9 additions & 1 deletion src/rdsn/src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,10 @@ void replica::handle_learning_error(error_code err, bool is_local_error)
err.to_string(),
is_local_error ? "local_error" : "remote error");

if (is_local_error && err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

_stub->_counter_replicas_learning_recent_learn_fail_count->increment();

update_local_configuration_with_no_ballot_change(
Expand Down Expand Up @@ -1570,7 +1574,11 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
}

// TODO: assign the returned error_code to err and check it
_app->apply_mutation(mu);
auto ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
handle_local_failure(ec);
return;
}

// appends logs-in-cache into plog to ensure them can be duplicated.
// if current case is step back, it means the logs has been reserved
Expand Down
6 changes: 6 additions & 0 deletions src/rdsn/src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2360,6 +2360,12 @@ void replica_stub::close_replica(replica_ptr r)
_counter_replicas_closing_count->decrement();
}

if (r->is_data_corrupted()) {
_fs_manager.remove_replica(id);
move_to_err_path(r->dir(), "trash replica");
_counter_replicas_recent_replica_move_error_count->increment();
}

ddebug("%s: finish to close replica", name.c_str());
}

Expand Down
1 change: 1 addition & 0 deletions src/rdsn/src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_follower;
friend class replica_follower_test;
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(replica_test, test_auto_trash);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down
9 changes: 8 additions & 1 deletion src/rdsn/src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,14 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
// because the external sst files may not exist, in this case, we won't consider it as
// an error.
if (!has_ingestion_request) {
return ERR_LOCAL_APP_FAILURE;
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
case rocksdb::Status::kCorruption:
return ERR_RDB_CORRUPTION;
default:
return ERR_LOCAL_APP_FAILURE;
}
}
}

Expand Down
26 changes: 15 additions & 11 deletions src/rdsn/src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,21 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(_replica,
_replica->_app->last_committed_decree(),
_replica->_options->max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree !=
_replica->_app->last_committed_decree() + 1) {
return;
}

_replica->_app->apply_mutation(mu);
});
prepare_list plist(
_replica,
_replica->_app->last_committed_decree(),
_replica->_options->max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree != _replica->_app->last_committed_decree() + 1) {
return;
}

auto e = _replica->_app->apply_mutation(mu);
if (e != ERR_OK) {
derror_replica("got an error({}) in commit stage of prepare_list", e);
return;
}
});

// replay private log
ec = mutation_log::replay(plog_files,
Expand Down
2 changes: 1 addition & 1 deletion src/rdsn/src/replica/test/clear.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.

rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/
39 changes: 39 additions & 0 deletions src/rdsn/src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
#include <dsn/utility/defer.h>
#include <gtest/gtest.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include "runtime/rpc/network.sim.h"

#include "common/backup_common.h"
#include "replica_test_base.h"
#include "replica/replica.h"
#include "replica/replica_http_service.h"
#include "replica/disk_cleaner.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -449,6 +451,43 @@ TEST_F(replica_test, test_clear_on_failure)
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, test_auto_trash)
{
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;

replica *rep =
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
dsn::utils::filesystem::create_directory(path);
ASSERT_TRUE(has_gpid(pid));

rep->handle_local_failure(ERR_RDB_CORRUPTION);
stub->wait_closing_replicas_finished();

ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
ASSERT_NE(dn, nullptr);
std::vector<std::string> subs;
ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
bool found = false;
const int ts_length = 16;
size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
for (const auto &sub : subs) {
if (sub.size() <= path.size()) {
continue;
}
uint64_t ts = 0;
if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
found = true;
break;
}
}
ASSERT_TRUE(found);
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, update_deny_client_test)
{
struct update_deny_client_test
Expand Down
12 changes: 11 additions & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
#include "rocksdb_wrapper.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/flags.h>
#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"
#include "base/pegasus_value_schema.h"

namespace pegasus {
namespace server {

DSN_DEFINE_int32("pegasus.server",
inject_write_error_for_test,
0,
"Which error code to inject in write path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
Expand Down Expand Up @@ -140,6 +146,10 @@ int rocksdb_wrapper::write(int64_t decree)
{
dassert(_write_batch->Count() != 0, "the number of updates in the batch is 0");

if (dsn_unlikely(FLAGS_inject_write_error_for_test != rocksdb::Status::kOk)) {
return FLAGS_inject_write_error_for_test;
}

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

rocksdb::Status status =
Expand Down

0 comments on commit 44cc352

Please sign in to comment.