Skip to content

Commit

Permalink
remove reqchangeconsensus
Browse files Browse the repository at this point in the history
  • Loading branch information
cmcater committed Aug 15, 2023
1 parent 224981e commit 8f400d3
Show file tree
Hide file tree
Showing 21 changed files with 482 additions and 3,269 deletions.
134 changes: 2 additions & 132 deletions internal/pkg/chainsdk/core/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,6 @@ func (chain *Chain) GetTrxFactory() chaindef.TrxFactoryIface {
return chain.trxFactory
}

func (chain *Chain) ReqChangeConsensus(producers []string, agrmTickLen, agrmTickCnt, fromBlock, fromEpoch, epoch uint64) (reqId string, nonce uint64, err error) {
chain_log.Debugf("<%s> ReqChangeConsensus called", chain.groupItem.GroupId)

if chain.Consensus.ConsensusProposer() == nil {
return "", 0, fmt.Errorf("consensus proposer is nil")
}

return chain.Consensus.ConsensusProposer().ReqChangeConsensus(producers, agrmTickLen, agrmTickCnt, fromBlock, fromEpoch, epoch)
}

// PSConn msg handler
func (chain *Chain) HandlePsConnMessage(pkg *quorumpb.Package) error {
//chain_log.Debugf("<%s> HandlePsConnMessage called, <%s>", chain.groupItem.GroupId, pkg.Type.String())
Expand Down Expand Up @@ -191,14 +181,6 @@ func (chain *Chain) HandlePsConnMessage(pkg *quorumpb.Package) error {
} else {
err = chain.HandleBftMsgPsConn(bftMsg)
}
} else if pkg.Type == quorumpb.PackageType_CC_MSG {
ccMsg := &quorumpb.CCMsg{}
err = proto.Unmarshal(pkg.Data, ccMsg)
if err != nil {
chain_log.Warnf(err.Error())
} else {
err = chain.HandleCCMsgPsConn(ccMsg)
}
} else if pkg.Type == quorumpb.PackageType_BROADCAST_MSG {
broadcastMsg := &quorumpb.BroadcastMsg{}
err = proto.Unmarshal(pkg.Data, broadcastMsg)
Expand Down Expand Up @@ -354,15 +336,6 @@ func (chain *Chain) HandleSyncMsgRex(syncMsg *quorumpb.SyncMsg, s network.Stream
}
return nil
}
func (chain *Chain) HandleCCMsgPsConn(msg *quorumpb.CCMsg) error {
//chain_log.Debugf("<%s> HandleChangeConsensusReqPsConn called", chain.groupItem.GroupId)
if chain.Consensus.ConsensusProposer() == nil {
//chain_log.Warningf("<%s> Consensus ConsensusProposer is nil", chain.groupItem.GroupId)
return nil
}

return chain.Consensus.ConsensusProposer().HandleCCMsg(msg)
}

func (chain *Chain) HandleBroadcastMsgPsConn(brd *quorumpb.BroadcastMsg) error {
chain_log.Debugf("<%s> HandleGroupBroadcastPsConn called", chain.groupItem.GroupId)
Expand Down Expand Up @@ -607,22 +580,18 @@ func (chain *Chain) CreateConsensus() error {

var user def.User
var producer def.Producer
var consensusProposer def.ConsensusProposer

var shouldCreateUser, shouldCreateProducer, shouldCreateConsensusProposer bool
var shouldCreateUser, shouldCreateProducer bool

if nodectx.GetNodeCtx().NodeType == nodectx.PRODUCER_NODE {
shouldCreateProducer = true
shouldCreateUser = false
shouldCreateConsensusProposer = true
} else if nodectx.GetNodeCtx().NodeType == nodectx.FULL_NODE {
//check if I am owner of the Group
if chain.groupItem.UserSignPubkey == chain.groupItem.OwnerPubKey {
shouldCreateProducer = true
shouldCreateConsensusProposer = true
} else {
shouldCreateProducer = false
shouldCreateConsensusProposer = false
}
shouldCreateUser = true
} else {
Expand All @@ -641,57 +610,12 @@ func (chain *Chain) CreateConsensus() error {
user.NewUser(chain.groupItem, chain.nodename, chain)
}

if shouldCreateConsensusProposer {
chain_log.Infof("<%s> Create and initial molasses consensusproposer", chain.groupItem.GroupId)
consensusProposer = &consensus.MolassesConsensusProposer{}
consensusProposer.NewConsensusProposer(chain.ChainCtx, chain.groupItem, chain.nodename, chain)
}

chain.Consensus = consensus.NewMolasses(producer, user, consensusProposer)
chain.Consensus = consensus.NewMolasses(producer, user)
chain.Consensus.StartProposeTrx()

return nil
}

// update change consensus result
func (chain *Chain) ReqConsensusChangeDone(bundle *quorumpb.ChangeConsensusResultBundle) {
chain_log.Debugf("<%s> ReqConsensusChangeDone called", chain.groupItem.GroupId)

//save change consensus result
nodectx.GetNodeCtx().GetChainStorage().UpdateChangeConsensusResult(chain.groupItem.GroupId, bundle, chain.nodename)

//stop all consensus tasks
chain.Consensus.ConsensusProposer().StopAllTasks()

switch bundle.Result {
case quorumpb.ChangeConsensusResult_SUCCESS:
chain_log.Debugf("<%s> ReqChangeConsensus SUCCESSFUL", chain.groupItem.GroupId)
/*
TBD
fix implement later
//stop current propose
chain.Consensus.Producer().StopPropose()
//update producer list
chain.updChainConsensus(trxId, bundle)
chain.Consensus.Producer().StartPropose()
//owner create the fork block and broadcast to all nodes
if chain.IsOwner() {
trx, err := chain.trxFactory.GetForkTrx("", bundle)
if err != nil {
chain_log.Warningf("<%s> GetChangeConsensusResultTrx failed with err <%s>", chain.groupItem.GroupId, err.Error())
return
}
chain_log.Debugf("<%s> ReqChangeConsensus SUCCESSFUL, trx created %x", chain.groupItem.GroupId, trx)
//TBD create fork block and broadcast
}
*/
case quorumpb.ChangeConsensusResult_FAIL:
chain_log.Debug("<%s> ReqChangeConsensus FAIL", chain.groupItem.GroupId)
}
}

func (chain *Chain) IsProducer() bool {
_, ok := chain.producerPool[chain.groupItem.UserSignPubkey]
return ok
Expand Down Expand Up @@ -900,60 +824,6 @@ func (chain *Chain) ApplyTrxsProducerNode(trxs []*quorumpb.Trx, nodename string)
return nil
}

/*
func (chain *Chain) applyConseususTrx(trx *quorumpb.Trx, decodeData []byte, nodename string) error {
chain_log.Debugf("<%s> applyConseususTrx called", chain.groupItem.GroupId)
//decode change consensus result
resultBundle := &quorumpb.ChangeConsensusResultBundle{}
err := proto.Unmarshal(decodeData, resultBundle)
if err != nil {
return err
}
//check if change consensus result is valid
if resultBundle.Result != quorumpb.ChangeConsensusResult_SUCCESS {
chain_log.Warningf("<%s> change consensus result is not success, skip", chain.groupItem.GroupId)
return nil
}
history, err := nodectx.GetNodeCtx().GetChainStorage().GetAllChangeConsensusResult(chain.groupItem.GroupId, nodename)
if err != nil {
return err
}
shouldAccept := true
for _, item := range history {
if item.Req.ReqId == resultBundle.Req.ReqId {
chain_log.Debugf("<%s> change consensus result with reqId <%s> already exist, skip", chain.groupItem.GroupId, resultBundle.Req.ReqId)
shouldAccept = false
break
}
if item.Req.Nonce > resultBundle.Req.Nonce {
chain_log.Debugf("<%s> change consensus result with reqId <%d> nonce <%d> is smaller than current nonce <%d>, skip", chain.groupItem.GroupId, resultBundle.Req.ReqId, resultBundle.Req.Nonce, item.Req.Nonce)
shouldAccept = false
break
}
}
if shouldAccept {
//save change consensus result
nodectx.GetNodeCtx().GetChainStorage().UpdateChangeConsensusResult(chain.groupItem.GroupId, resultBundle, nodename)
//update consensus
chain.updChainConsensus(trx.TrxId, resultBundle)
//stop current propose
if chain.Consensus.Producer() != nil {
chain.Consensus.Producer().StopPropose()
//update producer list
chain.Consensus.Producer().StartPropose()
}
}
return nil
}
*/

func (chain *Chain) VerifySign(hash, signature []byte, pubkey string) (bool, error) {
//check signature
bytespubkey, err := base64.RawURLEncoding.DecodeString(pubkey)
Expand Down
45 changes: 0 additions & 45 deletions internal/pkg/chainsdk/core/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,6 @@ func (grp *Group) JoinGroup(item *quorumpb.GroupItem) error {
item.UserSignPubkey,
grp.ChainCtx)

//commented by cuicat
//update producer list for ConnMgr just created
//grp.ChainCtx.UpdConnMgrProducer()

//create group consensus
grp.ChainCtx.CreateConsensus()

Expand Down Expand Up @@ -435,46 +431,10 @@ func (grp *Group) GetAppConfigItem(keyName string) (*quorumpb.AppConfigItem, err
return nodectx.GetNodeCtx().GetChainStorage().GetAppConfigItem(keyName, grp.Item.GroupId, grp.Nodename)
}

func (grp *Group) GetAllChangeConsensusResultBundle() ([]*quorumpb.ChangeConsensusResultBundle, error) {
group_log.Debugf("<%s> GetAllChangeConsensusResultBundle called", grp.Item.GroupId)
return nodectx.GetNodeCtx().GetChainStorage().GetAllChangeConsensusResult(grp.Item.GroupId, grp.Nodename)
}

func (grp *Group) GetCurrentTrxProposeInterval() (uint64, error) {
return nodectx.GetNodeCtx().GetChainStorage().GetProducerConsensusConfInterval(grp.Item.GroupId, grp.Nodename)
}

func (grp *Group) GetLastChangeConsensusResult(isSuccess bool) (*quorumpb.ChangeConsensusResultBundle, error) {
group_log.Debugf("<%s> GetLastSuccessChangeConsensusResult called", grp.Item.GroupId)
results, err := nodectx.GetNodeCtx().GetChainStorage().GetAllChangeConsensusResult(grp.Item.GroupId, grp.Nodename)
if err != nil {
return nil, err
}

//if there is only 1 proof and nonce is 0, return it (added by owner when create group)
if len(results) == 1 && results[0].Req.Nonce == 0 {
return results[0], nil
}

nonce := uint64(0)
last := &quorumpb.ChangeConsensusResultBundle{}
for _, result := range results {
if isSuccess && result.Result != quorumpb.ChangeConsensusResult_SUCCESS {
continue
}
if result.Req.Nonce > nonce {
last = result
nonce = result.Req.Nonce
}
}
return last, nil
}

func (grp *Group) GetChangeConsensusResultById(id string) (*quorumpb.ChangeConsensusResultBundle, error) {
group_log.Debugf("<%s> GetChangeConsensusResultById called", grp.Item.GroupId)
return nodectx.GetNodeCtx().GetChainStorage().GetChangeConsensusResultByReqId(grp.Item.GroupId, id, grp.Nodename)
}

// send update announce trx
func (grp *Group) UpdAnnounce(item *quorumpb.AnnounceItem) (string, error) {
group_log.Debugf("<%s> UpdAnnounce called", grp.Item.GroupId)
Expand Down Expand Up @@ -513,11 +473,6 @@ func (grp *Group) GetInitForkTrx(trxId string, item *quorumpb.ForkItem) (*quorum
return grp.ChainCtx.GetTrxFactory().GetForkTrx("", item)
}

func (grp *Group) ReqChangeConsensus(producers []string, agrmTickLength, agrmTickCount, fromBlock uint64, fromEpoch uint64, epoch uint64) (string, uint64, error) {
group_log.Debugf("<%s> ReqChangeConsensus called", grp.Item.GroupId)
return grp.ChainCtx.ReqChangeConsensus(producers, agrmTickLength, agrmTickCount, fromBlock, fromEpoch, epoch)
}

func (grp *Group) UpdGroupUser(item *quorumpb.UpdGroupUserItem) (string, error) {
group_log.Debugf("<%s> UpdUser called", grp.Item.GroupId)
trx, err := grp.ChainCtx.GetTrxFactory().GetUpdGroupUserTrx("", item)
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/chainsdk/def/chaindataciface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ type ChainDataSyncIface interface {
HandleTrxPsConn(trx *quorumpb.Trx) error
HandleBlockPsConn(block *quorumpb.Block) error
HandleBftMsgPsConn(hb *quorumpb.BftMsg) error
HandleCCMsgPsConn(req *quorumpb.CCMsg) error
HandleBroadcastMsgPsConn(c *quorumpb.BroadcastMsg) error
HandleSyncMsgRex(syncMsg *quorumpb.SyncMsg, fromstream network.Stream) error
StartSync() error
Expand Down
20 changes: 0 additions & 20 deletions internal/pkg/conn/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,6 @@ func (connMgr *ConnMgr) BroadcastBlock(blk *quorumpb.Block) error {
return psconn.Publish(pkgBytes)
}

func (connMgr *ConnMgr) BroadcastCCMsg(msg *quorumpb.CCMsg) error {
pkg := &quorumpb.Package{}

pbBytes, err := proto.Marshal(msg)
if err != nil {
return err
}

pkg.Type = quorumpb.PackageType_CC_MSG
pkg.Data = pbBytes

pkgBytes, err := proto.Marshal(pkg)
if err != nil {
return err
}

psconn := connMgr.getUserConn()
return psconn.Publish(pkgBytes)
}

func (connMgr *ConnMgr) BroadcastBftMsg(msg *quorumpb.BftMsg) error {
pkg := &quorumpb.Package{}

Expand Down
46 changes: 0 additions & 46 deletions internal/pkg/storage/chain/chaindb.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,49 +189,3 @@ func (cs *Storage) GetGroupConsensusInfo(groupId string, prefix ...string) (info
}
return info, nil
}

func (cs *Storage) UpdateChangeConsensusResult(groupId string, result *quorumpb.ChangeConsensusResultBundle, prefix ...string) error {
key := s.GetChangeConsensusResultKey(groupId, result.Req.ReqId, prefix...)
chaindb_log.Debugf("UpdateChangeConsensusResult key %s", key)
data, err := proto.Marshal(result)
if err != nil {
return err
}
return cs.dbmgr.Db.Set([]byte(key), data)
}

func (cs *Storage) GetAllChangeConsensusResult(groupId string, prefix ...string) ([]*quorumpb.ChangeConsensusResultBundle, error) {
var rList []*quorumpb.ChangeConsensusResultBundle
//chaindb_log.Debugf("GetAllChangeConsensusResult called")

key := s.GetChangeConsensusResultPrefix(groupId, prefix...)
err := cs.dbmgr.Db.PrefixForeach([]byte(key), func(k []byte, v []byte, err error) error {
if err != nil {
return err
}
item := quorumpb.ChangeConsensusResultBundle{}
perr := proto.Unmarshal(v, &item)
if perr != nil {
return perr
}
rList = append(rList, &item)
return nil
})

return rList, err
}

func (cs *Storage) GetChangeConsensusResultByReqId(groupId, reqId string, prefix ...string) (*quorumpb.ChangeConsensusResultBundle, error) {
key := s.GetChangeConsensusResultKey(groupId, reqId, prefix...)
chaindb_log.Debugf("GetChangeConsensusResultByReqId key %s", key)
data, err := cs.dbmgr.Db.Get([]byte(key))
if err != nil {
return nil, err
}
item := quorumpb.ChangeConsensusResultBundle{}
perr := proto.Unmarshal(data, &item)
if perr != nil {
return nil, perr
}
return &item, nil
}
2 changes: 0 additions & 2 deletions internal/pkg/storage/def/chainstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,4 @@ type APIHandlerIface interface {
GetProducers(groupId string, prefix ...string) ([]*quorumpb.ProducerItem, error)
GetUser(groupId, userSignPubkey string, prefix ...string) (*quorumpb.UserItem, error)
GetProducer(groupId, producerSignPubkey string, prefix ...string) (*quorumpb.ProducerItem, error)
GetAllChangeConsensusResult(groupId string, prefix ...string) ([]*quorumpb.ChangeConsensusResultBundle, error)
GetChangeConsensusResultByReqId(groupId, reqId string, prefix ...string) (*quorumpb.ChangeConsensusResultBundle, error)
}
Loading

0 comments on commit 8f400d3

Please sign in to comment.