Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/dpos pbft bos upgrade #91

Merged
merged 12 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 77 additions & 62 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ struct controller_impl {
chainbase::database reversible_blocks; ///< a special database to persist blocks that have successfully been applied but are still reversible
block_log blog;
optional<pending_state> pending;
bool pbft_enabled = false;
bool pbft_upgrading = false;
optional<block_id_type> pending_pbft_lib;
optional<block_id_type> pending_pbft_checkpoint;
vector<block_num_type> proposed_schedule_blocks;
Expand Down Expand Up @@ -351,13 +353,17 @@ struct controller_impl {

void init(std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot) {


bool report_integrity_hash = !!snapshot;
if (snapshot) {
EOS_ASSERT( !head, fork_database_exception, "" );
snapshot->validate();

read_from_snapshot( snapshot );

//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test

auto end = blog.read_head();
if( !end ) {
blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 );
Expand All @@ -368,6 +374,8 @@ struct controller_impl {
"Block log is provided with snapshot but does not contain the head block from the snapshot" );
}
} else {
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
if( !head ) {
initialize_fork_db(); // set head to genesis state
}
Expand All @@ -381,9 +389,6 @@ struct controller_impl {
}
}

//do upgrade migration if necessary;
migrate_upgrade();

if( shutdown() ) return;

const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
Expand Down Expand Up @@ -881,41 +886,21 @@ struct controller_impl {
}
}

bool is_new_version() {
auto ucb = upgrade_complete_block();
//new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb) return head->block_num > *ucb;
return false;
}

bool is_upgrading() {
auto utb = upgrade_target_block();
auto ucb = upgrade_complete_block();
auto is_upgrading = false;
if (utb) is_upgrading = head->block_num >= *utb;
if (ucb) is_upgrading = is_upgrading && head->block_num <= *ucb;
return is_upgrading;
}

/**
* @post regardless of the success of commit block there is no active pending block
*/
void commit_block( bool add_to_fork_db ) {
auto reset_pending_on_exit = fc::make_scoped_exit([this]{
pending.reset();
set_pbft_lib();
set_pbft_lscb();

});

try {
set_pbft_lib();
set_pbft_lscb();

if (add_to_fork_db) {
pending->_pending_block_state->validated = true;

auto new_version = is_new_version();

auto new_bsp = fork_db.add(pending->_pending_block_state, true, new_version);
auto new_bsp = fork_db.add(pending->_pending_block_state, true, pbft_enabled);
emit(self.accepted_block_header, pending->_pending_block_state);

head = fork_db.head();
Expand Down Expand Up @@ -1337,6 +1322,9 @@ struct controller_impl {
{
EOS_ASSERT( !pending, block_validate_exception, "pending block already exists" );

set_pbft_lib();
set_pbft_lscb();

auto guard_pending = fc::make_scoped_exit([this](){
pending.reset();
});
Expand All @@ -1350,31 +1338,41 @@ struct controller_impl {
pending.emplace(maybe_session());
}

auto utb = upgrade_target_block();
auto ucb = upgrade_complete_block();
if (utb && !ucb) {
if (head->dpos_irreversible_blocknum >= *utb) {
const auto& upo = db.get<upgrade_property_object>();
auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
wlog("system is going to be new version after the block ${b}", ("b", head->block_num));
}
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

auto new_version = is_new_version();
auto upgrading = is_upgrading();
if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}

pending->_block_status = s;
pending->_producer_block_id = producer_block_id;
pending->_signer = signer;
pending->_pending_block_state = std::make_shared<block_state>( *head, when, new_version); // promotes pending schedule (if any) to active
pending->_pending_block_state = std::make_shared<block_state>( *head, when, pbft_enabled); // promotes pending schedule (if any) to active
pending->_pending_block_state->in_current_chain = true;

pending->_pending_block_state->set_confirmed(confirm_block_count, new_version);
pending->_pending_block_state->set_confirmed(confirm_block_count, pbft_enabled);


auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(new_version);
auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(pbft_enabled);

//modify state in speculative block only if we are speculative reads mode (other wise we need clean state for head or irreversible reads)
if ( read_mode == db_read_mode::SPECULATIVE || pending->_block_status != controller::block_status::incomplete ) {
Expand All @@ -1384,7 +1382,7 @@ struct controller_impl {
auto lib_num = std::max(pending->_pending_block_state->dpos_irreversible_blocknum, pending->_pending_block_state->bft_irreversible_blocknum);
auto lscb_num = pending->_pending_block_state->pbft_stable_checkpoint_blocknum;

if (new_version && gpo.proposed_schedule_block_num) {
if (pbft_enabled && gpo.proposed_schedule_block_num) {
proposed_schedule_blocks.emplace_back(*gpo.proposed_schedule_block_num);
for ( auto itr = proposed_schedule_blocks.begin(); itr != proposed_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
Expand All @@ -1401,19 +1399,19 @@ struct controller_impl {
&& pending->_pending_block_state->pending_schedule.producers.size() == 0 // ... and there is room for a new pending schedule ...
&& !was_pending_promoted; // ... and not just because it was promoted to active at the start of this block, then:

if (new_version) {
if (pbft_enabled) {
should_promote_pending_schedule = should_promote_pending_schedule
&& pending->_pending_block_state->block_num > *gpo.proposed_schedule_block_num;
} else {
should_promote_pending_schedule = should_promote_pending_schedule
&& ( *gpo.proposed_schedule_block_num <= pending->_pending_block_state->dpos_irreversible_blocknum );
}

if ( upgrading && !replaying) wlog("system is upgrading, no producer schedule promotion will happen until fully upgraded.");
if ( pbft_upgrading && !replaying) wlog("system is upgrading, no producer schedule promotion will happen until fully upgraded.");

if ( should_promote_pending_schedule )
{
if (!upgrading) {
if (!pbft_upgrading) {
// Promote proposed schedule to pending schedule.
if (!replaying) {
ilog("promoting proposed schedule (set in block ${proposed_num}) to pending; current block: ${n} lib: ${lib} schedule: ${schedule} ",
Expand All @@ -1423,7 +1421,7 @@ struct controller_impl {
}
pending->_pending_block_state->set_new_producers(gpo.proposed_schedule);

if (new_version) {
if (pbft_enabled) {
promoted_schedule_blocks.emplace_back(pending->_pending_block_state->block_num);
for ( auto itr = promoted_schedule_blocks.begin(); itr != promoted_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
Expand Down Expand Up @@ -1576,11 +1574,11 @@ struct controller_impl {
auto prev = fork_db.get_block( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

auto new_version = is_new_version();
auto pbft = pbft_enabled;

return async_thread_pool( thread_pool, [b, prev, new_version]() {
return async_thread_pool( thread_pool, [b, prev, pbft]() {
const bool skip_validate_signee = false;
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee, new_version);
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee, pbft);
} );
}

Expand All @@ -1595,8 +1593,7 @@ struct controller_impl {
auto& b = new_header_state->block;
emit( self.pre_accepted_block, b );

auto new_version = is_new_version();
fork_db.add( new_header_state, false, new_version);
fork_db.add( new_header_state, false, pbft_enabled);

if (conf.trusted_producers.count(b->producer)) {
trusted_producer_light_validation = true;
Expand Down Expand Up @@ -1626,9 +1623,7 @@ struct controller_impl {
emit( self.pre_accepted_block, b );
const bool skip_validate_signee = !conf.force_all_checks;

auto new_version = is_new_version();

auto new_header_state = fork_db.add( b, skip_validate_signee, new_version);
auto new_header_state = fork_db.add( b, skip_validate_signee, pbft_enabled);

emit( self.accepted_block_header, new_header_state );

Expand Down Expand Up @@ -1661,11 +1656,13 @@ struct controller_impl {

void set_pbft_lib() {

if ((!pending || pending->_block_status != controller::block_status::incomplete) && pending_pbft_lib ) {
if (!pbft_enabled) return;

if ( pending_pbft_lib ) {
fork_db.set_bft_irreversible(*pending_pbft_lib);
pending_pbft_lib.reset();

if (read_mode != db_read_mode::IRREVERSIBLE) {
if (!pending && read_mode != db_read_mode::IRREVERSIBLE) {
maybe_switch_forks(controller::block_status::complete);
}
}
Expand All @@ -1677,7 +1674,10 @@ struct controller_impl {
}

void set_pbft_lscb() {
if ((!pending || pending->_block_status != controller::block_status::incomplete) && pending_pbft_checkpoint ) {

if (!pbft_enabled) return;

if ( pending_pbft_checkpoint ) {

auto checkpoint_block_state = fork_db.get_block(*pending_pbft_checkpoint);
if (checkpoint_block_state) {
Expand Down Expand Up @@ -2131,7 +2131,7 @@ chainbase::database& controller::mutable_db()const { return my->db; }

const fork_database& controller::fork_db()const { return my->fork_db; }

std::map<chain::public_key_type, signature_provider_type> controller::my_signature_providers()const{
std::map<chain::public_key_type, signature_provider_type> controller:: my_signature_providers()const{
return my->conf.my_signature_providers;
}

Expand Down Expand Up @@ -2520,21 +2520,23 @@ chain_id_type controller::get_chain_id()const {
return my->chain_id;
}

void controller::set_pbft_prepared(const block_id_type& id) const {
void controller::set_pbft_prepared(const block_id_type& id) {
my->pbft_prepared.reset();
auto bs = fetch_block_state_by_id(id);
if (bs) {
my->pbft_prepared = bs;
my->fork_db.mark_pbft_prepared_fork(bs);
maybe_switch_forks();
}
}

void controller::set_pbft_my_prepare(const block_id_type& id) const {
void controller::set_pbft_my_prepare(const block_id_type& id) {
my->my_prepare.reset();
auto bs = fetch_block_state_by_id(id);
if (bs) {
my->my_prepare = bs;
my->fork_db.mark_pbft_my_prepare_fork(bs);
maybe_switch_forks();
}
}

Expand All @@ -2543,11 +2545,18 @@ block_id_type controller::get_pbft_my_prepare() const {
return block_id_type{};
}

void controller::reset_pbft_my_prepare() const {
void controller::reset_pbft_my_prepare() {
my->fork_db.remove_pbft_my_prepare_fork();
maybe_switch_forks();
if (my->my_prepare) my->my_prepare.reset();
}

void controller::reset_pbft_prepared() {
my->fork_db.remove_pbft_prepared_fork();
maybe_switch_forks();
if (my->pbft_prepared) my->pbft_prepared.reset();
}

db_read_mode controller::get_read_mode()const {
return my->read_mode;
}
Expand Down Expand Up @@ -2703,12 +2712,18 @@ const upgrade_property_object& controller::get_upgrade_properties()const {
return my->db.get<upgrade_property_object>();
}

bool controller::is_upgraded() const {
return my->is_new_version();
bool controller::is_pbft_enabled() const {
return my->pbft_enabled;
}

bool controller::under_upgrade() const {
return my->is_upgrading();
bool controller::under_maintenance() const {
return my->pbft_upgrading;
}

void controller::maybe_switch_forks() {
if (!pending_block_state() && my->read_mode != db_read_mode::IRREVERSIBLE) {
my->maybe_switch_forks(controller::block_status::complete);
}
}

// this will be used in unit_test only, should not be called anywhere else.
Expand Down
33 changes: 33 additions & 0 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,38 @@ namespace eosio { namespace chain {
my->head = *my->index.get<by_lib_block_num>().begin();
}

void fork_database::remove_pbft_prepared_fork() {
auto oldest = *my->index.get<by_block_num>().begin();

auto& by_id_idx = my->index.get<by_block_id>();
auto itr = by_id_idx.find( oldest->id );
by_id_idx.modify( itr, [&]( auto& bsp ) { bsp->pbft_prepared = false; });

auto update = [&]( const vector<block_id_type>& in ) {
vector<block_id_type> updated;

for( const auto& i : in ) {
auto& pidx = my->index.get<by_prev>();
auto pitr = pidx.lower_bound( i );
auto epitr = pidx.upper_bound( i );
while( pitr != epitr ) {
pidx.modify( pitr, [&]( auto& bsp ) {
bsp->pbft_prepared = false;
updated.push_back( bsp->id );
});
++pitr;
}
}
return updated;
};

vector<block_id_type> queue{ oldest->id };
while(!queue.empty()) {
queue = update( queue );
}
my->head = *my->index.get<by_lib_block_num>().begin();
}

block_state_ptr fork_database::get_block_in_current_chain_by_num( uint32_t n )const {
const auto& numidx = my->index.get<by_block_num>();
auto nitr = numidx.lower_bound( n );
Expand Down Expand Up @@ -515,6 +547,7 @@ namespace eosio { namespace chain {
while( queue.size() ) {
queue = update( queue );
}
my->head = *my->index.get<by_lib_block_num>().begin();
}

void fork_database::set_latest_checkpoint( block_id_type id) {
Expand Down
Loading