Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Added cluster package, made monitor cluster aware
Browse files Browse the repository at this point in the history
  • Loading branch information
icellan committed Aug 9, 2022
1 parent 771f91d commit 4fcdc81
Show file tree
Hide file tree
Showing 18 changed files with 525 additions and 190 deletions.
2 changes: 1 addition & 1 deletion chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,6 @@ type MonitorService interface {
ProcessMempool(ctx context.Context) error
Processor() MonitorProcessor
SaveDestinations() bool
Start(ctx context.Context, handler MonitorHandler) error
Start(ctx context.Context, handler MonitorHandler, onStop func()) error
Stop(ctx context.Context) error
}
34 changes: 20 additions & 14 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chainstate
import (
"context"
"fmt"
"sync"
"time"

"github.com/BuxOrg/bux/utils"
Expand All @@ -28,11 +27,13 @@ type Monitor struct {
lockID string
logger zLogger.GormLoggerInterface
maxNumberOfDestinations int
mempoolSyncChannelActive bool
mempoolSyncChannel chan bool
monitorDays int
processMempoolOnConnect bool
processor MonitorProcessor
saveTransactionsDestinations bool
onStop func()
}

// MonitorOptions options for starting this monitorConfig
Expand Down Expand Up @@ -93,7 +94,6 @@ func NewMonitor(_ context.Context, options *MonitorOptions) (monitor *Monitor) {
loadMonitoredDestinations: options.LoadMonitoredDestinations,
lockID: options.LockID,
maxNumberOfDestinations: options.MaxNumberOfDestinations,
mempoolSyncChannel: make(chan bool),
monitorDays: options.MonitorDays,
processMempoolOnConnect: options.ProcessMempoolOnConnect,
saveTransactionsDestinations: options.SaveTransactionDestinations,
Expand Down Expand Up @@ -204,12 +204,11 @@ func (m *Monitor) ProcessMempool(ctx context.Context) error {
return err
}

// TODO: This is overkill right now, but gives us a chance to parallelize this stuff
var done sync.WaitGroup
done.Add(1)

// create a new channel to control this go routine
m.mempoolSyncChannel = make(chan bool)
m.mempoolSyncChannelActive = true
// run the processing of the txs in a different thread
go func() {
go func(mempoolSyncChannel chan bool) {
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] ProcessMempool mempoolTxs: %d\n", len(mempoolTxs)))
}
Expand Down Expand Up @@ -242,7 +241,7 @@ func (m *Monitor) ProcessMempool(ctx context.Context) error {
}
// While processing all the batches, check if channel is closed
select {
case <-m.mempoolSyncChannel:
case <-mempoolSyncChannel:
return
default:

Expand Down Expand Up @@ -288,10 +287,7 @@ func (m *Monitor) ProcessMempool(ctx context.Context) error {
}
}
}
done.Done()
}()
done.Wait()
m.mempoolSyncChannel <- true
}(m.mempoolSyncChannel)
}

return nil
Expand All @@ -308,7 +304,7 @@ func (m *Monitor) SetChainstateOptions(options *clientOptions) {
}

// Start open a socket to the service provider and monitorConfig transactions
func (m *Monitor) Start(ctx context.Context, handler MonitorHandler) error {
func (m *Monitor) Start(ctx context.Context, handler MonitorHandler, onStop func()) error {
if m.client == nil {
handler.SetMonitor(m)
m.handler = handler
Expand All @@ -319,15 +315,25 @@ func (m *Monitor) Start(ctx context.Context, handler MonitorHandler) error {
}
}

m.onStop = onStop

return m.client.Connect()
}

// Stop closes the monitoring socket and pauses monitoring
func (m *Monitor) Stop(ctx context.Context) error {
m.logger.Info(ctx, "[MONITOR] Stopping monitor...")
if m.IsConnected() { // Only close if still connected
defer close(m.mempoolSyncChannel)
if m.mempoolSyncChannelActive {
close(m.mempoolSyncChannel)
m.mempoolSyncChannelActive = false
}
return m.client.Disconnect()
}

if m.onStop != nil {
m.onStop()
}

return nil
}
22 changes: 22 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/cluster"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/mrz1836/go-cachestore"
Expand All @@ -26,6 +27,7 @@ type (
// clientOptions holds all the configuration for the client
clientOptions struct {
cacheStore *cacheStoreOptions // Configuration options for Cachestore (ristretto, redis, etc.)
cluster *clusterOptions // Configuration options for the cluster coordinator
chainstate *chainstateOptions // Configuration options for Chainstate (broadcast, sync, etc.)
dataStore *dataStoreOptions // Configuration options for the DataStore (MySQL, etc.)
debug bool // If the client is in debug mode
Expand Down Expand Up @@ -59,6 +61,13 @@ type (
options []cachestore.ClientOps // List of options
}

// clusterOptions holds the cluster configuration for Bux clusters
// at the moment we only support redis as the cluster coordinator
clusterOptions struct {
cluster.ClientInterface
options []cluster.ClientOps // List of options
}

// dataStoreOptions holds the data storage configuration and client
dataStoreOptions struct {
datastore.ClientInterface // Client for Datastore
Expand Down Expand Up @@ -137,6 +146,11 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error)
return nil, err
}

// Load the cluster coordinator
if err = client.loadCluster(ctx); err != nil {
return nil, err
}

// Load the Datastore (automatically migrate models)
if err = client.loadDatastore(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -233,6 +247,14 @@ func (c *Client) Cachestore() cachestore.ClientInterface {
return nil
}

// Cluster will return the cluster coordinator client
func (c *Client) Cluster() cluster.ClientInterface {
if c.options.cluster != nil && c.options.cluster.ClientInterface != nil {
return c.options.cluster.ClientInterface
}
return nil
}

// Chainstate will return the Chainstate service IF: exists and is enabled
func (c *Client) Chainstate() chainstate.ClientInterface {
if c.options.chainstate != nil && c.options.chainstate.ClientInterface != nil {
Expand Down
65 changes: 53 additions & 12 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package bux

import (
"context"
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/cluster"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/mrz1836/go-cachestore"
Expand All @@ -22,6 +24,17 @@ func (c *Client) loadCache(ctx context.Context) (err error) {
return
}

// loadCluster will load the cluster coordinator
func (c *Client) loadCluster(ctx context.Context) (err error) {

// Load if a custom interface was NOT provided
if c.options.cluster.ClientInterface == nil {
c.options.cluster.ClientInterface, err = cluster.NewClient(ctx, c.options.cluster.options...)
}

return
}

// loadChainstate will load chainstate configuration and start the Chainstate client
func (c *Client) loadChainstate(ctx context.Context) (err error) {

Expand Down Expand Up @@ -125,21 +138,49 @@ func (c *Client) loadMonitor(ctx context.Context) (err error) {
return // No monitor, exit!
}

// Detect if the monitor has been loaded already (Looking for a LockID, Cachestore & last heartbeat)
lockID := monitor.GetLockID()
if len(lockID) > 0 {
var locked bool
if locked, err = checkMonitorHeartbeat(ctx, c, lockID); err != nil { // Locally and global check
return
} else if locked { // Monitor found using LockID and heartbeat is in range
return
}
// Create a handler and load destinations if option has been set
handler := NewMonitorHandler(ctx, c, monitor)

// Monitor might be found using LockID but the heartbeat failed (closed? disconnected? bad state?)
// Start the default monitor
if err = startDefaultMonitor(ctx, c, monitor); err != nil {
return err
}

// Start the default monitor
return startDefaultMonitor(ctx, c, monitor)
lockKey := c.options.cluster.GetClusterPrefix() + lockKeyMonitorLockID
lockID := monitor.GetLockID()
go func() {
var currentLock string
for {
if currentLock, err = c.Cachestore().WriteLockWithSecret(ctx, lockKey, lockID, defaultMonitorLockTTL); err != nil {
// do nothing really, we just didn't get the lock
if monitor.IsDebug() {
monitor.Logger().Info(ctx, "[MONITOR] failed getting lock for monitor: %s: %w", lockID, err)
}
}

if lockID == currentLock {
// Start the monitor, if not connected
if !monitor.IsConnected() {
if err = monitor.Start(ctx, &handler, func() {
_, err = c.Cachestore().ReleaseLock(ctx, lockKeyMonitorLockID, lockID)
}); err != nil {
monitor.Logger().Info(ctx, "[MONITOR] ERROR: failed starting monitor: %w", err)
}
}
} else {
// first close any monitor if running
if monitor.IsConnected() {
if err = monitor.Stop(ctx); err != nil {
monitor.Logger().Info(ctx, "[MONITOR] ERROR: failed stopping monitor: %w", err)
}
}
}

time.Sleep(defaultMonitorSleep)
}
}()

return nil
}

// runModelMigrations will run the model Migrate() method for all models
Expand Down
36 changes: 36 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/cluster"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/coocood/freecache"
Expand Down Expand Up @@ -50,6 +51,10 @@ func defaultClientOptions() *clientOptions {
syncOnChain: true, // Enabled by default for new users
},

cluster: &clusterOptions{
options: []cluster.ClientOps{},
},

// Blank cache config
cacheStore: &cacheStoreOptions{
ClientInterface: nil,
Expand Down Expand Up @@ -562,6 +567,37 @@ func WithCronService(cronService taskmanager.CronService) ClientOps {
}
}

// -----------------------------------------------------------------
// CLUSTER
// -----------------------------------------------------------------

// WithClusterRedis will set the cluster coordinator to use redis
func WithClusterRedis(redisOptions *redis.Options) ClientOps {
return func(c *clientOptions) {
if redisOptions != nil {
c.cluster.options = append(c.cluster.options, cluster.WithRedis(redisOptions))
}
}
}

// WithClusterKeyPrefix will set the cluster key prefix to use for all keys in the cluster coordinator
func WithClusterKeyPrefix(prefix string) ClientOps {
return func(c *clientOptions) {
if prefix != "" {
c.cluster.options = append(c.cluster.options, cluster.WithKeyPrefix(prefix))
}
}
}

// WithClusterClient will set the cluster options on the client
func WithClusterClient(clusterClient cluster.ClientInterface) ClientOps {
return func(c *clientOptions) {
if clusterClient != nil {
c.cluster.ClientInterface = clusterClient
}
}
}

// -----------------------------------------------------------------
// CHAIN-STATE
// -----------------------------------------------------------------
Expand Down
78 changes: 78 additions & 0 deletions cluster/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cluster

import (
"context"

"github.com/go-redis/redis/v8"
zLogger "github.com/mrz1836/go-logger"
)

type (

// Client is the client (configuration)
Client struct {
pubSubService
options *clientOptions
}

// clientOptions holds all the configuration for the client
clientOptions struct {
coordinator Coordinator // which coordinator to use, either 'memory' or 'redis'
debug bool // For extra logs and additional debug information
logger zLogger.GormLoggerInterface // Internal logger interface
newRelicEnabled bool // Whether to use New Relic
prefix string // the cluster key prefix to use before all keys
redisOptions *redis.Options
}
)

// NewClient create new cluster client
func NewClient(ctx context.Context, opts ...ClientOps) (*Client, error) {
// Create a new client with defaults
client := &Client{options: defaultClientOptions()}

// Overwrite defaults with any set by user
for _, opt := range opts {
opt(client.options)
}

// Use NewRelic if it's enabled (use existing txn if found on ctx)
ctx = client.options.getTxnCtx(ctx)

// Set logger if not set
if client.options.logger == nil {
client.options.logger = zLogger.NewGormLogger(client.IsDebug(), 4)
}

if client.options.coordinator == CoordinatorRedis {
pubSubClient, err := NewRedisPubSub(ctx, client.options.redisOptions)
if err != nil {
return nil, err
}
pubSubClient.debug = client.options.debug
pubSubClient.prefix = client.options.prefix
client.pubSubService = pubSubClient
} else {
pubSubClient, err := NewMemoryPubSub(ctx)
if err != nil {
return nil, err
}

pubSubClient.debug = client.options.debug
pubSubClient.prefix = client.options.prefix
client.pubSubService = pubSubClient
}

// Return the client
return client, nil
}

// IsDebug returns whether debugging is on or off
func (c *Client) IsDebug() bool {
return c.options.debug
}

// GetClusterPrefix returns the cluster key prefix that can be used in things like Redis
func (c *Client) GetClusterPrefix() string {
return c.options.prefix
}
Loading

0 comments on commit 4fcdc81

Please sign in to comment.