Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #112 from BuxOrg/multi-server-monitor
Browse files Browse the repository at this point in the history
Feature: Multi-server improvements for destination Monitoring
  • Loading branch information
mrz1836 authored Jun 3, 2022
2 parents 4c6cb98 + 2dba90f commit 34697f9
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 53 deletions.
9 changes: 5 additions & 4 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID
// RecordMonitoredTransaction will parse the transaction and save it into the Datastore
//
// This function will try to record the transaction directly, without checking draft ids etc.
func (c *Client) RecordMonitoredTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error) {
func recordMonitoredTransaction(ctx context.Context, client ClientInterface, txHex string,
opts ...ModelOps) (*Transaction, error) {

// Check for existing NewRelic transaction
ctx = c.GetOrStartTxn(ctx, "record_monitored_transaction")
ctx = client.GetOrStartTxn(ctx, "record_monitored_transaction")

// Create the model & set the default options (gives options from client->model)
newOpts := c.DefaultModelOptions(append(opts, New())...)
newOpts := client.DefaultModelOptions(append(opts, New())...)
transaction := newTransaction(txHex, newOpts...)

// Ensure that we have a transaction id (created from the txHex)
Expand All @@ -127,7 +128,7 @@ func (c *Client) RecordMonitoredTransaction(ctx context.Context, txHex string, o

// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
ctx, fmt.Sprintf(lockKeyRecordTx, id), c.Cachestore(),
ctx, fmt.Sprintf(lockKeyRecordTx, id), client.Cachestore(),
)
defer unlock()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion chainstate/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func (a *AgentClient) SetFilter(regex string, bloomFilter *BloomProcessorFilter)
return a.Client.Publish("set_filter", data)
}

// newCentrifugeClient will create a new Centrifuge using the provided handler and default configurations
func newCentrifugeClient(wsURL string, handler whatsonchain.SocketHandler) MonitorClient {
c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig())
c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig()) // todo: use our own defaults/custom options

c.OnConnect(handler)
c.OnDisconnect(handler)
Expand Down
37 changes: 10 additions & 27 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bux

import (
"context"
"fmt"

"github.com/BuxOrg/bux/cachestore"
"github.com/BuxOrg/bux/chainstate"
Expand Down Expand Up @@ -93,43 +92,27 @@ func (c *Client) loadTaskmanager(ctx context.Context) (err error) {
// Cachestore is required to be loaded before this method is called
func (c *Client) loadMonitor(ctx context.Context) (err error) {

// Load monitor if set by the user
// Check if the monitor was set by the user
monitor := c.options.chainstate.Monitor()
if monitor == nil {
return
return // No monitor, exit!
}

// Detect if the monitor has been loaded already (Looking for a LockID & Cachestore)
// Detect if the monitor has been loaded already (Looking for a LockID, Cachestore & last heartbeat)
lockID := monitor.GetLockID()
cs := c.Cachestore()
if cs != nil && len(lockID) > 0 {
// Check if there is already a monitor loaded using this unique lock id
key, _ := cs.Get(ctx, fmt.Sprintf(lockKeyMonitorLockID, lockID))
if len(key) > 0 {
// Monitor has already loaded with this LockID
c.Logger().Info(ctx, "monitor has already been loaded using this lockID: "+lockID)
if len(lockID) > 0 {
var locked bool
if locked, err = checkMonitorHeartbeat(ctx, c, lockID); err != nil { // Locally and global check
return
}
}

// Create a handler and load destinations if option has been set
handler := NewMonitorHandler(ctx, c, monitor)
if c.options.chainstate.Monitor().LoadMonitoredDestinations() {
if err = c.loadMonitoredDestinations(ctx, monitor); err != nil {
} else if locked { // Monitor found using LockID and heartbeat is in range
return
}
}

// Start the monitor
if err = monitor.Start(ctx, &handler); err != nil {
return err
// Monitor might be found using LockID but the heartbeat failed (closed? disconnected? bad state?)
}

// Set the cache-key lock for this monitor
if cs != nil && len(lockID) > 0 {
return cs.Set(ctx, fmt.Sprintf(lockKeyMonitorLockID, lockID), lockID)
}
return
// Start the default monitor
return startDefaultMonitor(ctx, c, monitor)
}

// runModelMigrations will run the model Migrate() method for all models
Expand Down
11 changes: 6 additions & 5 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ func defaultClientOptions() *clientOptions {
taskManager: &taskManagerOptions{
ClientInterface: nil,
cronTasks: map[string]time.Duration{
ModelDraftTransaction.String() + "_clean_up": 60 * time.Second,
ModelIncomingTransaction.String() + "_process": 30 * time.Second,
ModelSyncTransaction.String() + "_" + syncActionBroadcast: 30 * time.Second,
ModelSyncTransaction.String() + "_" + syncActionSync: 60 * time.Second,
ModelSyncTransaction.String() + "_" + syncActionP2P: 35 * time.Second,
ModelDestination.String() + "_monitor": taskIntervalMonitorCheck,
ModelDraftTransaction.String() + "_clean_up": taskIntervalDraftCleanup,
ModelIncomingTransaction.String() + "_process": taskIntervalProcessIncomingTxs,
ModelSyncTransaction.String() + "_" + syncActionBroadcast: taskIntervalSyncActionBroadcast,
ModelSyncTransaction.String() + "_" + syncActionP2P: taskIntervalSyncActionP2P,
ModelSyncTransaction.String() + "_" + syncActionSync: taskIntervalSyncActionSync,
},
},

Expand Down
12 changes: 12 additions & 0 deletions definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
defaultDatabaseReadTimeout = 20 * time.Second // For all "GET" or "SELECT" methods
defaultDraftTxExpiresIn = 20 * time.Second // Default TTL for draft transactions
defaultHTTPTimeout = 20 * time.Second // Default timeout for HTTP requests
defaultMonitorHeartbeat = 60 // in Seconds (heartbeat for active monitor)
defaultMonitorHeartbeatMax = 90 // in Seconds (max out of range time for heartbeat, something is wrong)
defaultOverheadSize = uint64(8) // 8 bytes is the default overhead in a transaction = 4 bytes version + 4 bytes nLockTime
defaultQueryTxTimeout = 10 * time.Second // Default timeout for syncing on-chain information
defaultSleepForNewBlockHeaders = 30 * time.Second // Default wait before checking for a new unprocessed block
Expand All @@ -24,6 +26,16 @@ const (
version = "v0.2.28" // bux version
)

// Defaults for task cron jobs (tasks)
const (
taskIntervalDraftCleanup = 60 * time.Second // Default task time for cron jobs (seconds)
taskIntervalMonitorCheck = defaultMonitorHeartbeat * time.Second // Default task time for cron jobs (seconds)
taskIntervalProcessIncomingTxs = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionP2P = 35 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionSync = 40 * time.Second // Default task time for cron jobs (seconds)
)

// All the base models
const (
ModelAccessKey ModelName = "access_key"
Expand Down
1 change: 0 additions & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ type TransactionService interface {
opts ...ModelOps) (*DraftTransaction, error)
RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string,
opts ...ModelOps) (*Transaction, error)
RecordMonitoredTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
UpdateTransactionMetadata(ctx context.Context, xPubID, id string, metadata Metadata) (*Transaction, error)
}

Expand Down
36 changes: 36 additions & 0 deletions model_destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/BuxOrg/bux/datastore"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
"github.com/bitcoinschema/go-bitcoin/v2"
)
Expand Down Expand Up @@ -441,3 +442,38 @@ func (m *Destination) AfterDeleted(ctx context.Context) error {
m.DebugLog("end: " + m.Name() + " AfterDelete hook")
return nil
}

// RegisterTasks will register the model specific tasks on client initialization
func (m *Destination) RegisterTasks() error {

// No task manager loaded?
tm := m.Client().Taskmanager()
if tm == nil {
return nil
}

// Register the task locally (cron task - set the defaults)
monitorTask := m.Name() + "_monitor"
ctx := context.Background()

// Register the task
if err := tm.RegisterTask(&taskmanager.Task{
Name: monitorTask,
RetryLimit: 1,
Handler: func(client ClientInterface) error {
if taskErr := taskCheckActiveMonitor(ctx, client.Logger(), client); taskErr != nil {
client.Logger().Error(ctx, "error running "+monitorTask+" task: "+taskErr.Error())
}
return nil
},
}); err != nil {
return err
}

// Run the task periodically
return tm.RunTask(ctx, &taskmanager.TaskOptions{
Arguments: []interface{}{m.Client()},
RunEveryPeriod: m.Client().GetTaskPeriod(monitorTask),
TaskName: monitorTask,
})
}
83 changes: 77 additions & 6 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,117 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/datastore"
"github.com/BuxOrg/bux/utils"
)

// destinationMonitor is the struct of responses for Monitoring
type destinationMonitor struct {
LockingScript string `json:"locking_script" toml:"locking_script" yaml:"locking_script" bson:"locking_script"`
}

func (c *Client) loadMonitoredDestinations(ctx context.Context, monitor chainstate.MonitorService) error {
// loadMonitoredDestinations will load destinations that should be monitored
func loadMonitoredDestinations(ctx context.Context, client ClientInterface, monitor chainstate.MonitorService) error {

// Create conditions using the max monitor days
conditions := map[string]interface{}{
"monitor": map[string]interface{}{
"$gt": time.Now().Add(time.Duration(-24*monitor.GetMonitorDays()) * time.Hour),
},
}

// Create monitor query with max destinations
queryParams := &datastore.QueryParams{
Page: 1,
PageSize: monitor.GetMaxNumberOfDestinations(),
OrderByField: "monitor",
SortDirection: "desc",
}

// Get all destinations that match the query
var destinations []*destinationMonitor
if err := c.Datastore().GetModels(
if err := client.Datastore().GetModels(
ctx, &[]*Destination{}, conditions, queryParams, &destinations, defaultDatabaseReadTimeout,
); err != nil && !errors.Is(err, datastore.ErrNoResults) {
return err
}

// Loop all destinations and add to Monitor
for _, model := range destinations {
// todo: skipping the error check?
_ = monitor.Processor().Add(utils.P2PKHRegexpString, model.LockingScript)
if err := monitor.Processor().Add(utils.P2PKHRegexpString, model.LockingScript); err != nil {
return err
}
}

if c.options.debug && c.Logger() != nil {
c.Logger().Info(ctx, fmt.Sprintf("[MONITOR] Added %d destinations to monitor", len(destinations)))
// Debug line
if client.IsDebug() && client.Logger() != nil {
client.Logger().Info(ctx, fmt.Sprintf("[MONITOR] Added %d destinations to monitor", len(destinations)))
}

return nil
}

// checkMonitorHeartbeat will check for a Monitor heartbeat key and detect if it's locked or not
func checkMonitorHeartbeat(ctx context.Context, client ClientInterface, lockID string) (bool, error) {

// Make sure cachestore is loaded (safety check)
cs := client.Cachestore()
if cs == nil {
return false, nil
}

// Check if there is already a monitor loaded using this unique lock id & detect last heartbeat
lastHeartBeatString, _ := cs.Get(ctx, fmt.Sprintf(lockKeyMonitorLockID, lockID))
if len(lastHeartBeatString) > 0 { // Monitor has already loaded with this LockID

currentUnixTime := time.Now().UTC().Unix()

// Convert the heartbeat and then compare
unixTime, err := strconv.Atoi(lastHeartBeatString)
if err != nil {
return false, err
}

// Heartbeat is good - skip loading the monitor
if int64(unixTime+defaultMonitorHeartbeatMax) > currentUnixTime {
client.Logger().Info(ctx, fmt.Sprintf("monitor has already been loaded using this lockID: %s last heartbeat: %d", lockID, unixTime))
return true, nil
} else { // Heartbeat failed the max (out of range) check for the last heart beat
client.Logger().Info(ctx, fmt.Sprintf("found monitor lockID: %s but heartbeat is out of range: %d vs %d", lockID, unixTime, currentUnixTime))

// Continue, and load the monitor...
}
}
return false, nil
}

// startDefaultMonitor will create a handler, start monitor, and store the first heartbeat
func startDefaultMonitor(ctx context.Context, client ClientInterface, monitor chainstate.MonitorService) error {

// Create a handler and load destinations if option has been set
handler := NewMonitorHandler(ctx, client, monitor)
if client.Chainstate().Monitor().LoadMonitoredDestinations() {
if err := loadMonitoredDestinations(ctx, client, monitor); err != nil {
return err
}
}

// Start the monitor
if err := monitor.Start(ctx, &handler); err != nil {
return err
}

// Set the cache-key lock for this monitor (with a heartbeat time of now)
if len(monitor.GetLockID()) > 0 {
return client.Cachestore().Set(
ctx,
fmt.Sprintf(lockKeyMonitorLockID, monitor.GetLockID()),
fmt.Sprintf("%d", time.Now().UTC().Unix()),
)
}
return nil
}
Loading

0 comments on commit 34697f9

Please sign in to comment.