Skip to content

Commit

Permalink
Monitor v2 (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
danil-lashin authored Oct 26, 2022
1 parent 254408a commit 5d60c77
Show file tree
Hide file tree
Showing 5 changed files with 1,651 additions and 74 deletions.
275 changes: 232 additions & 43 deletions module/cmd/mhub2/cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@ package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
"runtime"
"strconv"
"sort"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"

"github.com/ethereum/go-ethereum/common"

"github.com/ethereum/go-ethereum/ethclient"

stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"

tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"

"github.com/MinterTeam/mhub2/module/solidity"
"github.com/MinterTeam/mhub2/module/x/mhub2/types"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -22,52 +32,93 @@ import (
"github.com/cosmos/cosmos-sdk/client/flags"
)

type MonitorConfig struct {
OurAddress string `json:"our_address"`
TelegramToken string `json:"telegram_token"`
ChatID int64 `json:"chat_id"`
EthereumRPC []string `json:"ethereum_rpc"`
BNBChainRPC []string `json:"bnb_chain_rpc"`
}

const blockDelay = 6

// AddMonitorCmd returns monitor cobra Command.
func AddMonitorCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "monitor [our_address] [telegram_bot_token] [chat_id]",
Use: "monitor [config]",
Short: "",
Long: ``,
Args: cobra.ExactArgs(3),
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
bot, err := tgbotapi.NewBotAPI(args[1])
config, err := readConfig(args[0])
if err != nil {
panic(err)
return err
}

chatId, err := strconv.Atoi(args[2])
bot, err := tgbotapi.NewBotAPI(config.TelegramToken)
if err != nil {
panic(err)
}

newText := func(t string) string {
return fmt.Sprintf("Watching...\n%s%s", time.Now().Format(time.Stamp), t)
return fmt.Sprintf("%s\n%s", time.Now().Format(time.Stamp), t)
}

startMsg, err := bot.Send(tgbotapi.NewMessage(int64(chatId), newText("")))
chat, err := bot.GetChat(tgbotapi.ChatInfoConfig{
ChatConfig: tgbotapi.ChatConfig{
ChatID: config.ChatID,
},
})
if err != nil {
panic(err)
}

ourAddress := args[0]
nonceErrorCounter := 0
hasNonceError := false
var startMsg tgbotapi.Message

if chat.PinnedMessage != nil {
startMsg = *chat.PinnedMessage
} else {
msg := tgbotapi.NewMessage(config.ChatID, newText(""))
msg.DisableNotification = true
startMsg, err = bot.Send(msg)
if err != nil {
panic(err)
}

bot.Request(tgbotapi.PinChatMessageConfig{
ChatID: startMsg.Chat.ID,
MessageID: startMsg.MessageID,
DisableNotification: true,
})
}

nonceErrorCounter := make(map[types.ChainID]int)
handleErr := func(err error) {
pc, filename, line, _ := runtime.Caller(1)

str := fmt.Sprintf("[error] in %s[%s:%d] %v", runtime.FuncForPC(pc).Name(), filename, line, err)
if _, err := bot.Send(tgbotapi.NewMessage(int64(chatId), str)); err != nil {
if _, err := bot.Send(tgbotapi.NewMessage(config.ChatID, str)); err != nil {
println(err.Error())
}

startMsg, _ = bot.Send(tgbotapi.NewMessage(int64(chatId), newText("")))
msg := tgbotapi.NewMessage(config.ChatID, newText(""))
msg.DisableNotification = true
startMsg, _ = bot.Send(msg)
bot.Request(tgbotapi.PinChatMessageConfig{
ChatID: startMsg.Chat.ID,
MessageID: startMsg.MessageID,
DisableNotification: true,
})
}

i := 0
initialized := false

for {
time.Sleep(time.Second * 5)
if initialized {
time.Sleep(time.Minute)
}

initialized = true
t := ""

clientCtx, err := client.GetClientQueryContext(cmd)
Expand Down Expand Up @@ -97,10 +148,18 @@ func AddMonitorCmd() *cobra.Command {
stakingQueryClient := stakingtypes.NewQueryClient(clientCtx)
vals, _ := stakingQueryClient.Validators(context.Background(), &stakingtypes.QueryValidatorsRequest{})

valHasFailure := map[string]bool{}
valHasNonceFailure := map[string]map[types.ChainID]uint64{}
failuresLog := ""

for _, val := range vals.GetValidators() {
valHasNonceFailure[val.OperatorAddress] = map[types.ChainID]uint64{}
}

actualNonces := map[types.ChainID]uint64{}

chains := []types.ChainID{"ethereum", "minter", "bsc"}
for _, chain := range chains {
t = fmt.Sprintf("%s\n\n<b>%s</b>", t, chain.String())

delegatedKeys, err := queryClient.DelegateKeys(context.Background(), &types.DelegateKeysRequest{
ChainId: chain.String(),
})
Expand All @@ -109,17 +168,32 @@ func AddMonitorCmd() *cobra.Command {
continue
}

response, err := queryClient.LastSubmittedExternalEvent(context.Background(), &types.LastSubmittedExternalEventRequest{
Address: ourAddress,
ChainId: chain.String(),
})

actualNonce, err := getActualNonce(chain, config, delegatedKeys.GetDelegateKeys(), queryClient)
if err != nil {
handleErr(err)
continue
}

nonce := response.EventNonce
actualNonces[chain] = actualNonce

if config.OurAddress != "" {
response, err := queryClient.LastSubmittedExternalEvent(context.Background(), &types.LastSubmittedExternalEventRequest{
Address: config.OurAddress,
ChainId: chain.String(),
})

if err != nil {
handleErr(err)
continue
}

ourNonce := response.EventNonce
if ourNonce < actualNonce {
nonceErrorCounter[chain]++
} else {
nonceErrorCounter[chain] = 0
}
}

for _, k := range delegatedKeys.GetDelegateKeys() {
response, err := queryClient.LastSubmittedExternalEvent(context.Background(), &types.LastSubmittedExternalEventRequest{
Expand All @@ -135,39 +209,58 @@ func AddMonitorCmd() *cobra.Command {

for _, v := range vals.GetValidators() {
if v.OperatorAddress == k.ValidatorAddress {
t = fmt.Sprintf("%s\n%d %s", t, response.GetEventNonce(), v.GetMoniker())
nonce := response.GetEventNonce()
if nonce < actualNonce {
valHasNonceFailure[v.OperatorAddress][chain] = nonce
valHasFailure[v.OperatorAddress] = true
}
}
}
}

if nonce < response.GetEventNonce() {
hasNonceError = true
}
}

sortedVals := vals.GetValidators()
sort.Slice(sortedVals, func(i, j int) bool {
return sortedVals[i].BondedTokens().GT(sortedVals[j].BondedTokens())
})

for _, v := range sortedVals {
alert := "🟢"
if valHasFailure[v.OperatorAddress] {
alert = fmt.Sprintf("🔴️")
failuresLog = fmt.Sprintf("%s⚠️️ <b>%s</b> ", failuresLog, v.GetMoniker())
}
t = fmt.Sprintf("%s\n%s <b>%s</b> %d HUB", t, alert, v.GetMoniker(), v.BondedTokens().QuoRaw(1e18).Int64())

if !hasNonceError {
nonceErrorCounter = 0
if valHasFailure[v.OperatorAddress] {
var nonceErrs []string
for _, chain := range chains {
if nonce, ok := valHasNonceFailure[v.OperatorAddress][chain]; ok {
nonceErrs = append(nonceErrs, fmt.Sprintf("nonce <b>%d</b> of <b>%d</b> on <b>%s</b>", nonce, actualNonces[chain], chain.String()))
}
}

failuresLog += strings.Join(nonceErrs, ", ") + "\n"
}
}

if hasNonceError {
nonceErrorCounter += 1
for _, chain := range chains {
if nonceErrorCounter[chain] > 5 {
handleErr(errors.New("event nonce on " + chain.String() + " was not updated for too long"))
continue
}
}
hasNonceError = false

if nonceErrorCounter > 5 {
handleErr(errors.New("event nonce on some external network was not updated for too long. Check your orchestrators and minter-connector"))
continue
if failuresLog != "" {
t = t + "\n\n" + failuresLog
}

if i%12 == 0 {
msg := tgbotapi.NewEditMessageText(startMsg.Chat.ID, startMsg.MessageID, newText(t))
msg.ParseMode = "html"
_, err := bot.Send(msg)
if err != nil {
println(err.Error())
}
msg := tgbotapi.NewEditMessageText(startMsg.Chat.ID, startMsg.MessageID, newText(t))
msg.ParseMode = "html"
if _, err := bot.Send(msg); err != nil {
println(err.Error())
}
i++
}

return nil
Expand All @@ -178,3 +271,99 @@ func AddMonitorCmd() *cobra.Command {

return cmd
}

func getActualNonce(chain types.ChainID, config MonitorConfig, keys []*types.MsgDelegateKeys, queryClient types.QueryClient) (uint64, error) {
switch chain {
case "ethereum", "bsc":
var address common.Address
var RPCs []string

switch chain {
case "ethereum":
address = common.HexToAddress("0x897c27fa372aa730d4c75b1243e7ea38879194e2")
RPCs = config.EthereumRPC
case "bsc":
address = common.HexToAddress("0xf5b0ed82a0b3e11567081694cc66c3df133f7c8f")
RPCs = config.BNBChainRPC
}

maxNonce, err := getEvmNonce(address, RPCs)
if err != nil {
return 0, err
}

if maxNonce == 0 {
return 0, errors.New("no available nonce source for " + chain.String())
}

return maxNonce, nil
case "minter":
maxNonce := uint64(0)
for _, k := range keys {
response, err := queryClient.LastSubmittedExternalEvent(context.Background(), &types.LastSubmittedExternalEventRequest{
Address: k.OrchestratorAddress,
ChainId: chain.String(),
})
if err != nil {
if !strings.Contains(err.Error(), "validator is not bonded") {
return 0, err
}
}

if maxNonce < response.GetEventNonce() {
maxNonce = response.GetEventNonce()
}
}

return maxNonce, nil
}

return 0, nil
}

func getEvmNonce(address common.Address, RPCs []string) (uint64, error) {
maxNonce := uint64(0)
for _, rpc := range RPCs {
evmClient, err := ethclient.Dial(rpc)
if err != nil {
continue
}

instance, err := solidity.NewHub2(address, evmClient)
if err != nil {
continue
}

latestBlock, err := evmClient.BlockNumber(context.TODO())
if err != nil {
continue
}

lastNonce, err := instance.StateLastEventNonce(&bind.CallOpts{
BlockNumber: big.NewInt(int64(latestBlock - blockDelay)),
})
if err != nil {
continue
}

if maxNonce < lastNonce.Uint64() {
maxNonce = lastNonce.Uint64()
}
}

return maxNonce, nil
}

func readConfig(path string) (MonitorConfig, error) {
config := MonitorConfig{}
configBody, err := os.ReadFile(path)
if err != nil {
return MonitorConfig{}, err
}

if err := json.Unmarshal(configBody, &config); err != nil {
return MonitorConfig{}, err
}

return config, nil
}
7 changes: 7 additions & 0 deletions module/config-monitor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"our_address": "hub1...",
"telegram_token": "",
"chat_id": 0,
"ethereum_rpc": ["https://mainnet.infura.io/v3/"],
"bnb_chain_rpc": ["https://bscrpc.com/"]
}
Loading

0 comments on commit 5d60c77

Please sign in to comment.