diff --git a/models/modules.go b/models/modules.go index 7ad4b995..5fefabc9 100644 --- a/models/modules.go +++ b/models/modules.go @@ -1,15 +1,24 @@ package models import ( + "context" + "errors" "fmt" + "time" + shared "github.com/filecoin-project/venus/venus-shared/types" + types "github.com/filecoin-project/venus/venus-shared/types/messager" "github.com/ipfs-force-community/sophon-messager/filestore" "github.com/ipfs-force-community/sophon-messager/models/mysql" "github.com/ipfs-force-community/sophon-messager/models/repo" "github.com/ipfs-force-community/sophon-messager/models/sqlite" + logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" + "gorm.io/gorm" ) +var log = logging.Logger("db") + func SetDataBase(fsRepo filestore.FSRepo) (repo.Repo, error) { switch fsRepo.Config().DB.Type { case "sqlite": @@ -22,7 +31,10 @@ func SetDataBase(fsRepo filestore.FSRepo) (repo.Repo, error) { } func AutoMigrate(repo repo.Repo) error { - return repo.AutoMigrate() + if err := repo.AutoMigrate(); err != nil { + return fmt.Errorf("migrate: %w", err) + } + return MigrateAddress(repo) } func Options() fx.Option { @@ -34,3 +46,55 @@ func Options() fx.Option { fx.Provide(repo.NewINodeProvider), ) } + +func MigrateAddress(r repo.Repo) error { + ctx := context.Background() + list, err := r.AddressRepo().ListAddress(ctx) + if err != nil { + return err + } + + return r.Transaction(func(txRepo repo.TxRepo) error { + + for _, addrInfo := range list { + fAddr := addrInfo.Addr.String() + tAddr := "t" + fAddr[1:] + _, err := txRepo.AddressRepo().GetOneRecord(ctx, fAddr) + if err == nil { + if _, err := txRepo.AddressRepo().GetOneRecord(ctx, tAddr); err == nil { + if err := txRepo.AddressRepo().DelAddress(ctx, tAddr); err != nil { + return err + } + log.Infof("delete address %s success", tAddr) + } + continue + } + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + + log.Infof("migrate address %s to %s", tAddr, fAddr) + now := time.Now() + newAddrInfo := &types.Address{ + ID: shared.NewUUID(), + Addr: addrInfo.Addr, + Nonce: addrInfo.Nonce, + SelMsgNum: addrInfo.SelMsgNum, + State: addrInfo.State, + IsDeleted: repo.NotDeleted, + FeeSpec: addrInfo.FeeSpec, + CreatedAt: now, + UpdatedAt: now, + } + if err := txRepo.AddressRepo().SaveAddress(context.Background(), newAddrInfo); err != nil { + return err + } + log.Infof("migrate address %s to %s success", tAddr, fAddr) + if err := txRepo.AddressRepo().DelAddress(context.Background(), tAddr); err != nil { + return err + } + log.Infof("delete address %s success", tAddr) + } + return nil + }) +} diff --git a/models/mysql/address.go b/models/mysql/address.go index 61f4b870..f6d833ee 100644 --- a/models/mysql/address.go +++ b/models/mysql/address.go @@ -111,9 +111,9 @@ func (s mysqlAddressRepo) GetAddressByID(ctx context.Context, id shared.UUID) (* return a.Address() } -func (s mysqlAddressRepo) GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) { +func (s mysqlAddressRepo) GetOneRecord(ctx context.Context, addr string) (*types.Address, error) { var a mysqlAddress - if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr.String()).Error; err != nil { + if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr).Error; err != nil { return nil, err } @@ -165,14 +165,14 @@ func (s mysqlAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addre return result, nil } -func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr address.Address) error { - return s.DB.WithContext(ctx).Model((*mysqlAddress)(nil)).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). - UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error +func (s mysqlAddressRepo) DelAddress(ctx context.Context, addr string) error { + return s.DB.WithContext(ctx).Where("addr = ?", addr).Delete(&mysqlAddress{}).Error } -func (s mysqlAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). - UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error +func (s mysqlAddressRepo) UpdateNonce(addr address.Address, nonce uint64) (int64, error) { + query := s.DB.Model(&mysqlAddress{}).Where("addr = ? and is_deleted = ?", addr.String(), repo.NotDeleted). + UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}) + return query.RowsAffected, query.Error } func (s mysqlAddressRepo) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error { diff --git a/models/mysql/address_test.go b/models/mysql/address_test.go index e8d88958..dd4e6cb9 100644 --- a/models/mysql/address_test.go +++ b/models/mysql/address_test.go @@ -102,7 +102,7 @@ func testGetOneRecord(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WithArgs(addr.String()). WillReturnRows(sqlmock.NewRows([]string{"addr"}).AddRow(addr.String())) - res, err := r.AddressRepo().GetOneRecord(ctx, addr) + res, err := r.AddressRepo().GetOneRecord(ctx, addr.String()) assert.NoError(t, err) assert.Equal(t, addr, res.Addr) } @@ -153,17 +153,16 @@ func testDelAddress(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta( - "UPDATE `addresses` SET `is_deleted`=?,`state`=?,`updated_at`=? WHERE addr = ? and is_deleted = ?")). - WithArgs(repo.Deleted, types.AddressStateRemoved, anyTime{}, addr.String(), repo.NotDeleted). + "DELETE FROM `addresses` WHERE addr = ?")). + WithArgs(addr.String()). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().DelAddress(ctx, addr) + err := r.AddressRepo().DelAddress(ctx, addr.String()) assert.NoError(t, err) } func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { - ctx := context.Background() addr := testutil.AddressProvider()(t) nonce := uint64(10) @@ -174,7 +173,7 @@ func testUpdateNonce(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - err := r.AddressRepo().UpdateNonce(ctx, addr, nonce) + _, err := r.AddressRepo().UpdateNonce(addr, nonce) assert.NoError(t, err) } diff --git a/models/repo/address_repo.go b/models/repo/address_repo.go index 8110b93a..5b915b30 100644 --- a/models/repo/address_repo.go +++ b/models/repo/address_repo.go @@ -14,12 +14,12 @@ type AddressRepo interface { SaveAddress(ctx context.Context, address *types.Address) error GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) GetAddressByID(ctx context.Context, id shared.UUID) (*types.Address, error) - GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) + GetOneRecord(ctx context.Context, addr string) (*types.Address, error) HasAddress(ctx context.Context, addr address.Address) (bool, error) ListAddress(ctx context.Context) ([]*types.Address, error) ListActiveAddress(ctx context.Context) ([]*types.Address, error) - DelAddress(ctx context.Context, addr address.Address) error - UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error + DelAddress(ctx context.Context, addr string) error + UpdateNonce(addr address.Address, nonce uint64) (int64, error) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error UpdateSelectMsgNum(ctx context.Context, addr address.Address, num uint64) error UpdateFeeParams(ctx context.Context, addr address.Address, gasOverEstimation, gasOverPremium float64, maxFee, gasFeeCap, baseFee big.Int) error diff --git a/models/sqlite/address.go b/models/sqlite/address.go index b05b8840..94223faf 100644 --- a/models/sqlite/address.go +++ b/models/sqlite/address.go @@ -110,9 +110,9 @@ func (s sqliteAddressRepo) GetAddressByID(ctx context.Context, id shared.UUID) ( return a.Address() } -func (s sqliteAddressRepo) GetOneRecord(ctx context.Context, addr address.Address) (*types.Address, error) { +func (s sqliteAddressRepo) GetOneRecord(ctx context.Context, addr string) (*types.Address, error) { var a sqliteAddress - if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr.String()).Error; err != nil { + if err := s.DB.WithContext(ctx).Take(&a, "addr = ?", addr).Error; err != nil { return nil, err } @@ -164,9 +164,10 @@ func (s sqliteAddressRepo) ListActiveAddress(ctx context.Context) ([]*types.Addr return result, nil } -func (s sqliteAddressRepo) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). - UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}).Error +func (s sqliteAddressRepo) UpdateNonce(addr address.Address, nonce uint64) (int64, error) { + query := s.DB.Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). + UpdateColumns(map[string]interface{}{"nonce": nonce, "updated_at": time.Now()}) + return query.RowsAffected, query.Error } func (s sqliteAddressRepo) UpdateState(ctx context.Context, addr address.Address, state types.AddressState) error { @@ -205,9 +206,8 @@ func (s sqliteAddressRepo) UpdateFeeParams(ctx context.Context, addr address.Add return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()).UpdateColumns(updateColumns).Error } -func (s sqliteAddressRepo) DelAddress(ctx context.Context, addr address.Address) error { - return s.DB.WithContext(ctx).Model((*sqliteAddress)(nil)).Where("addr = ? and is_deleted = -1", addr.String()). - UpdateColumns(map[string]interface{}{"is_deleted": repo.Deleted, "state": types.AddressStateRemoved, "updated_at": time.Now()}).Error +func (s sqliteAddressRepo) DelAddress(ctx context.Context, addr string) error { + return s.DB.WithContext(ctx).Where("addr = ?", addr).Delete(&sqliteAddress{}).Error } var _ repo.AddressRepo = &sqliteAddressRepo{} diff --git a/models/sqlite/address_test.go b/models/sqlite/address_test.go index c59aacdd..1629bacc 100644 --- a/models/sqlite/address_test.go +++ b/models/sqlite/address_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "gorm.io/gorm" - "github.com/ipfs-force-community/sophon-messager/models/repo" "github.com/ipfs-force-community/sophon-messager/testhelper" ) @@ -116,13 +115,14 @@ func TestAddress(t *testing.T) { t.Run("UpdateNonce", func(t *testing.T) { nonce := uint64(5) - assert.NoError(t, addressRepo.UpdateNonce(ctx, addrInfo.Addr, nonce)) + _, err := addressRepo.UpdateNonce(addrInfo.Addr, nonce) + assert.NoError(t, err) r, err := addressRepo.GetAddress(ctx, addrInfo.Addr) assert.NoError(t, err) assert.Equal(t, nonce, r.Nonce) // set nonce for a not exist address - err = addressRepo.UpdateNonce(ctx, randAddr, nonce) + _, err = addressRepo.UpdateNonce(randAddr, nonce) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) @@ -191,19 +191,18 @@ func TestAddress(t *testing.T) { }) t.Run("DelAddress", func(t *testing.T) { - assert.NoError(t, addressRepo.DelAddress(ctx, addrInfo2.Addr)) + assert.NoError(t, addressRepo.DelAddress(ctx, addrInfo2.Addr.String())) r, err := addressRepo.GetAddress(ctx, addrInfo2.Addr) assert.Error(t, err) assert.Nil(t, r) - r, err = addressRepo.GetOneRecord(ctx, addrInfo2.Addr) - assert.NoError(t, err) - assert.Equal(t, types.AddressStateRemoved, r.State) - assert.Equal(t, repo.Deleted, r.IsDeleted) + r, err = addressRepo.GetOneRecord(ctx, addrInfo2.Addr.String()) + assert.Error(t, err) + assert.Nil(t, r) // delete a not exist address - err = addressRepo.DelAddress(ctx, randAddr) + err = addressRepo.DelAddress(ctx, randAddr.String()) assert.NoError(t, err) _, err = addressRepo.GetAddress(ctx, randAddr) assert.Contains(t, err.Error(), gorm.ErrRecordNotFound.Error()) diff --git a/service/address_service.go b/service/address_service.go index 911a7d12..7b825ee3 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -75,8 +75,9 @@ func (addressService *AddressService) SaveAddress(ctx context.Context, address * return address.ID, err } -func (addressService *AddressService) UpdateNonce(ctx context.Context, addr address.Address, nonce uint64) error { - return addressService.repo.AddressRepo().UpdateNonce(ctx, addr, nonce) +func (addressService *AddressService) UpdateNonce(_ context.Context, addr address.Address, nonce uint64) error { + _, err := addressService.repo.AddressRepo().UpdateNonce(addr, nonce) + return err } func (addressService *AddressService) GetAddress(ctx context.Context, addr address.Address) (*types.Address, error) { @@ -120,7 +121,7 @@ func (addressService *AddressService) ListActiveAddress(ctx context.Context) ([] } func (addressService *AddressService) DeleteAddress(ctx context.Context, addr address.Address) error { - return addressService.repo.AddressRepo().DelAddress(ctx, addr) + return addressService.repo.AddressRepo().DelAddress(ctx, addr.String()) } func (addressService *AddressService) ForbiddenAddress(ctx context.Context, addr address.Address) error { diff --git a/service/message_selector.go b/service/message_selector.go index 09e33d30..6584e974 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -308,14 +308,19 @@ func (w *work) startSelectMessage( w.log.Errorf("select message failed: %v", err) return } - w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), - len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - recordMetric(ctx, w.addr, selectResult) + if len(selectResult.SelectMsg) != 0 || len(selectResult.ToPushMsg) != 0 || len(selectResult.ErrMsg) != 0 { + w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg), + len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start)) - if err := w.saveSelectedMessages(ctx, selectResult); err != nil { - w.log.Errorf("failed to save selected messages to db %v", err) - return + recordMetric(ctx, w.addr, selectResult) + + if len(selectResult.SelectMsg) > 0 || len(selectResult.ErrMsg) > 0 { + if err := w.saveSelectedMessages(selectResult); err != nil { + w.log.Errorf("failed to save selected messages to db %v", err) + return + } + } } for _, msg := range selectResult.SelectMsg { @@ -351,7 +356,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs) addrInfo.Nonce = nonceInLatestTs addrInfo.UpdatedAt = time.Now() - err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce) + maxMsgNonce, err := w.getMaxMessageNonceFromDB(addrInfo.Addr) + if err == nil { + if maxMsgNonce > addrInfo.Nonce { + addrInfo.Nonce = maxMsgNonce + 1 + w.log.Warnf("max message nonce in db %d", maxMsgNonce) + } + } + err = w.repo.AddressRepo().SaveAddress(ctx, addrInfo) if err != nil { return nil, fmt.Errorf("update nonce failed: %v", err) } @@ -362,14 +374,13 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, // calc the message needed nonceGap := addrInfo.Nonce - nonceInLatestTs if nonceGap >= maxAllowPendingMessage { - w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) + w.log.Warnf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap) return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } wantCount := maxAllowPendingMessage - nonceGap - w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) // get unfill message selectCount := mathutil.MinUint64(wantCount, 100) @@ -379,12 +390,14 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, } if len(messages) == 0 { - w.log.Infof("have no unfill message") + w.log.Debugf("have no unfill message") return &MsgSelectResult{ ToPushMsg: toPushMessage, Address: addrInfo, }, nil } + w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, + nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount) var errMsg []msgErrInfo count := uint64(0) @@ -477,13 +490,50 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce nonceInLatestTs := actor.Nonce // todo actor nonce maybe the latest ts. not need appliedNonce if nonceInTs, ok := appliedNonce.Get(w.addr); ok { - w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) + w.log.Debugf("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs) nonceInLatestTs = nonceInTs } return nonceInLatestTs, actor.Nonce, nil } +func (w *work) getMaxMessageNonceFromDB(addr address.Address) (uint64, error) { + var maxNonce uint64 + queryParams := []*types.MsgQueryParams{ + { + State: []types.MessageState{ + types.FillMsg, + }, + From: []address.Address{ + addr, + }, + }, + { + State: []types.MessageState{ + types.OnChainMsg, + }, + From: []address.Address{ + addr, + }, + Limit: 50, + }, + } + + for _, param := range queryParams { + msgs, err := w.repo.MessageRepo().ListMessageByParams(param) + if err == nil && len(msgs) > 0 { + for _, msg := range msgs { + if maxNonce < msg.Nonce { + maxNonce = msg.Nonce + } + } + return maxNonce, nil + } + } + + return maxNonce, nil +} + func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage { filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr) if err != nil { @@ -580,7 +630,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s return sigI.(*crypto.Signature), nil } -func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error { +func (w *work) saveSelectedMessages(selectResult *MsgSelectResult) error { startSaveDB := time.Now() w.log.Infof("start save messages to database") err := w.repo.Transaction(func(txRepo repo.TxRepo) error { @@ -590,9 +640,11 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect } addrInfo := selectResult.Address - if err := txRepo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce); err != nil { + row, err := txRepo.AddressRepo().UpdateNonce(addrInfo.Addr, addrInfo.Nonce) + if err != nil { return err } + w.log.Infof("update nonce to %v, row affected %v", addrInfo.Nonce, row) } for _, m := range selectResult.ErrMsg { diff --git a/service/message_selector_test.go b/service/message_selector_test.go index e87dc6f9..e5544fa5 100644 --- a/service/message_selector_test.go +++ b/service/message_selector_test.go @@ -718,7 +718,7 @@ func selectMsgWithAddress(ctx context.Context, } allSelectRes.ErrMsg = append(allSelectRes.ErrMsg, selectResult.ErrMsg...) - assert.NoError(t, work.saveSelectedMessages(ctx, selectResult)) + assert.NoError(t, work.saveSelectedMessages(selectResult)) } return allSelectRes