Skip to content

Commit

Permalink
add msg exec migrator
Browse files Browse the repository at this point in the history
  • Loading branch information
MonikaCat committed Jan 20, 2024
1 parent eb03537 commit f7501ff
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 8 deletions.
5 changes: 3 additions & 2 deletions cmd/migrate/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"os"

"github.com/forbole/juno/v5/cmd/migrate/msgexec"
parsecmdtypes "github.com/forbole/juno/v5/cmd/parse/types"

"github.com/spf13/cobra"

v4 "github.com/forbole/juno/v5/cmd/migrate/v4"
Expand All @@ -15,7 +15,8 @@ type Migrator func(parseCfg *parsecmdtypes.Config) error

var (
migrations = map[string]Migrator{
"v4": v4.RunMigration,
"v4": v4.RunMigration,
"msgexec": msgexec.RunMigration,
}
)

Expand Down
44 changes: 44 additions & 0 deletions cmd/migrate/msgexec/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package msgexec

import (
"fmt"

utils "github.com/forbole/juno/v5/cmd/migrate/utils"

parse "github.com/forbole/juno/v5/cmd/parse/types"
"github.com/forbole/juno/v5/database"
msgexecdb "github.com/forbole/juno/v5/database/legacy/msgexec"
"github.com/forbole/juno/v5/database/postgresql"
)

// RunMigration runs the migrations from v4 to v5
func RunMigration(parseConfig *parse.Config) error {
cfg, err := GetConfig()
if err != nil {
return fmt.Errorf("error while reading config: %s", err)
}

// Migrate the database
err = migrateDb(cfg, parseConfig)
if err != nil {
return fmt.Errorf("error while migrating database: %s", err)
}

return nil
}

func migrateDb(cfg utils.Config, parseConfig *parse.Config) error {
// Build the codec
encodingConfig := parseConfig.GetEncodingConfigBuilder()()

// Get the db
databaseCtx := database.NewContext(cfg.Database, &encodingConfig, parseConfig.GetLogger())
db, err := postgresql.Builder(databaseCtx)
if err != nil {
return fmt.Errorf("error while building the db: %s", err)
}

// Build the migrator and perform the migrations
migrator := msgexecdb.NewMigrator(db.(*postgresql.Database))
return migrator.Migrate()
}
31 changes: 31 additions & 0 deletions cmd/migrate/msgexec/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package msgexec

import (
"fmt"
"os"
"path"

utils "github.com/forbole/juno/v5/cmd/migrate/utils"
"gopkg.in/yaml.v3"

"github.com/forbole/juno/v5/types/config"
)

// GetConfig returns the configuration reading it from the config.yaml file present inside the home directory
func GetConfig() (utils.Config, error) {
file := path.Join(config.HomePath, "config.yaml")

// Make sure the path exists
if _, err := os.Stat(file); os.IsNotExist(err) {
return utils.Config{}, fmt.Errorf("config file does not exist")
}

bz, err := os.ReadFile(file)
if err != nil {
return utils.Config{}, fmt.Errorf("error while reading config files: %s", err)
}

var cfg utils.Config
err = yaml.Unmarshal(bz, &cfg)
return cfg, err
}
File renamed without changes.
8 changes: 4 additions & 4 deletions cmd/migrate/v4/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"os"

utils "github.com/forbole/juno/v5/cmd/migrate/utils"
parsecmdtypes "github.com/forbole/juno/v5/cmd/parse/types"

"gopkg.in/yaml.v3"

v3 "github.com/forbole/juno/v5/cmd/migrate/v3"
Expand Down Expand Up @@ -40,10 +40,10 @@ func RunMigration(parseConfig *parsecmdtypes.Config) error {
return nil
}

func migrateConfig() (Config, error) {
func migrateConfig() (utils.Config, error) {
cfg, err := v3.GetConfig()
if err != nil {
return Config{}, fmt.Errorf("error while reading v3 config: %s", err)
return utils.Config{}, fmt.Errorf("error while reading v3 config: %s", err)
}

sslMode := cfg.Database.SSLMode
Expand All @@ -56,7 +56,7 @@ func migrateConfig() (Config, error) {
schema = "public"
}

return Config{
return utils.Config{
Node: cfg.Node,
Chain: cfg.Chain,
Database: databaseconfig.Config{
Expand Down
129 changes: 129 additions & 0 deletions database/legacy/msgexec/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package msgexec

import (
"encoding/json"
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lib/pq"
"github.com/rs/zerolog/log"

dbtypes "github.com/forbole/juno/v5/database/migrate/utils"
msgmodule "github.com/forbole/juno/v5/modules/messages"
"github.com/forbole/juno/v5/types"
)

// Migrate implements database.Migrator
func (db *Migrator) Migrate() error {
msgTypes, err := db.getAllMsgExecStoredInDatabase()
if err != nil {
return fmt.Errorf("error while getting message types rows: %s", err)
}

var skipped = 0
// Migrate the transactions
log.Info().Msg("** migrating transactions **")
log.Debug().Int("tx count", len(msgTypes)).Msg("processing total transactions")

for _, msgType := range msgTypes {
log.Debug().Str("tx hash", msgType.TransactionHash).Msg("getting transaction....")

tx, err := db.getMsgExecTransactionsFromDatabase(msgType.TransactionHash)
if err != nil {
return fmt.Errorf("error while getting transaction %s: %s", msgType.TransactionHash, err)
}

if tx.Success == "true" {
var msgs sdk.ABCIMessageLogs
err = json.Unmarshal([]byte(tx.Logs), &msgs)
if err != nil {
return fmt.Errorf("error while unmarshaling tx logs: %s", err)
}

var addresses []string

for _, msg := range msgs {
for _, event := range msg.Events {
for _, attribute := range event.Attributes {
// Try parsing the address as a validator address
validatorAddress, _ := sdk.ValAddressFromBech32(attribute.Value)
if validatorAddress != nil {
addresses = append(addresses, validatorAddress.String())
}

// Try parsing the address as an account address
accountAddress, err := sdk.AccAddressFromBech32(attribute.Value)
if err != nil {
// Skip if the address is not an account address
continue
}

addresses = append(addresses, accountAddress.String())
}
}
}
involvedAddresses := msgmodule.RemoveDuplicates(addresses)

fmt.Printf("\n ADDRESSES BEFORE %s", msgType.InvolvedAccountsAddresses)
fmt.Printf("\n ADDRESSES AFTER %s \n", involvedAddresses)

err = db.updateMessage(types.NewMessage(msgType.TransactionHash,
int(msgType.Index),
msgType.Type,
msgType.Value,
involvedAddresses,
msgType.Height), msgType.PartitionID)

if err != nil {
return fmt.Errorf("error while storing updated message: %s", err)
}
} else {
skipped++
}

}

log.Debug().Int("*** Total Skipped ***", skipped)

return nil

}

// getMsgTypesFromMessageTable retrieves messages types stored in database inside message table
func (db *Migrator) getAllMsgExecStoredInDatabase() ([]dbtypes.MessageRow, error) {
const msgType = "cosmos.authz.v1beta1.MsgExec"
var rows []dbtypes.MessageRow
err := db.SQL.Select(&rows, `SELECT * FROM message WHERE type = $1`, msgType)
if err != nil {
return nil, err
}

return rows, nil
}

// getMsgTypesFromMessageTable retrieves messages types stored in database inside message table
func (db *Migrator) getMsgExecTransactionsFromDatabase(txHash string) (dbtypes.TransactionRow, error) {
var rows []dbtypes.TransactionRow
err := db.SQL.Select(&rows, `SELECT * FROM transaction WHERE hash = $1`, txHash)
if err != nil {
return dbtypes.TransactionRow{}, err
}

return rows[0], nil
}

// updateMessage stores updated message inside the database
func (db *Migrator) updateMessage(msg *types.Message, partitionID int64) error {
stmt := `
INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height, partition_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (transaction_hash, index, partition_id) DO UPDATE
SET height = excluded.height,
type = excluded.type,
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses`

_, err := db.SQL.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID)
return err

}
21 changes: 21 additions & 0 deletions database/legacy/msgexec/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package msgexec

import (
"github.com/jmoiron/sqlx"

"github.com/forbole/juno/v5/database"
"github.com/forbole/juno/v5/database/postgresql"
)

var _ database.Migrator = &Migrator{}

// Migrator represents the database migrator that should be used to migrate from v2 of the database to v3
type Migrator struct {
SQL *sqlx.DB
}

func NewMigrator(db *postgresql.Database) *Migrator {
return &Migrator{
SQL: db.SQL,
}
}
10 changes: 10 additions & 0 deletions database/migrate/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ type TransactionRow struct {
RawLog string `db:"raw_log"`
Logs string `db:"logs"`
}

type MessageRow struct {
TransactionHash string `db:"transaction_hash"`
Index int64 `db:"index"`
Type string `db:"type"`
Value string `db:"value"`
InvolvedAccountsAddresses string `db:"involved_accounts_addresses"`
Height int64 `db:"height"`
PartitionID int64 `db:"partition_id"`
}
4 changes: 2 additions & 2 deletions modules/messages/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func DefaultMessagesParser(tx *types.Tx) ([]string, error) {
}

// function to remove duplicate values
func removeDuplicates(s []string) []string {
func RemoveDuplicates(s []string) []string {
bucket := make(map[string]bool)
var result []string
for _, str := range s {
Expand Down Expand Up @@ -54,5 +54,5 @@ func parseAddressesFromEvents(tx *types.Tx) []string {

}

return removeDuplicates(addresses)
return RemoveDuplicates(addresses)
}

0 comments on commit f7501ff

Please sign in to comment.