Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Moved Add to monitor, added monitor_client interface, added debug out…
Browse files Browse the repository at this point in the history
…put in transaction event handler
  • Loading branch information
icellan committed Apr 15, 2022
1 parent fc893ed commit 61d99e4
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 67 deletions.
1 change: 1 addition & 0 deletions chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type MonitorService interface {
GetFalsePositiveRate() float64
GetMaxNumberOfDestinations() int
GetProcessMempoolOnConnect() bool
Add(regexpString string, item string) error
Processor() MonitorProcessor
ProcessMempool(ctx context.Context) error
Monitor(handler MonitorHandler) error
Expand Down
32 changes: 12 additions & 20 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package chainstate

import (
"context"
"errors"
"fmt"
"time"

"github.com/BuxOrg/bux/logger"
"github.com/centrifugal/centrifuge-go"
"github.com/mrz1836/go-whatsonchain"
)

Expand All @@ -15,7 +15,7 @@ type Monitor struct {
debug bool
chainstateOptions *clientOptions
logger logger.Interface
client *centrifuge.Client
client MonitorClient
processor MonitorProcessor
connected bool
centrifugeServer string
Expand Down Expand Up @@ -50,23 +50,6 @@ func (o *MonitorOptions) checkDefaults() {
}
}

func newClient(wsURL string, handler whatsonchain.SocketHandler) *centrifuge.Client {
c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig())

c.OnConnect(handler)
c.OnDisconnect(handler)
c.OnMessage(handler)
c.OnError(handler)

c.OnServerPublish(handler)
c.OnServerSubscribe(handler)
c.OnServerUnsubscribe(handler)
c.OnServerJoin(handler)
c.OnServerLeave(handler)

return c
}

// NewMonitor starts a new monitorConfig and loads all addresses that need to be monitored into the bloom filter
func NewMonitor(_ context.Context, options *MonitorOptions) *Monitor {
options.checkDefaults()
Expand Down Expand Up @@ -143,12 +126,21 @@ func (m *Monitor) Monitor(handler MonitorHandler) error {
handler.SetMonitor(m)
m.handler = handler
m.logger.Info(context.Background(), "[MONITOR] Connecting to server: %s", m.centrifugeServer)
m.client = newClient(m.centrifugeServer, handler)
m.client = newCentrifugeClient(m.centrifugeServer, handler)
}

return m.client.Connect()
}

// Add a new item to monitor
func (m *Monitor) Add(regexString, item string) error {
if m.processor == nil {
return errors.New("monitor processor not available")
}
// todo signal to bux-agent that a new item was added
return m.processor.Add(regexString, item)
}

// Connected sets the connected state to true
func (m *Monitor) Connected() {
m.connected = true
Expand Down
29 changes: 29 additions & 0 deletions chainstate/monitor_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package chainstate

import (
"github.com/centrifugal/centrifuge-go"
"github.com/mrz1836/go-whatsonchain"
)

// MonitorClient interface
type MonitorClient interface {
Connect() error
Disconnect() error
}

func newCentrifugeClient(wsURL string, handler whatsonchain.SocketHandler) MonitorClient {
c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig())

c.OnConnect(handler)
c.OnDisconnect(handler)
c.OnMessage(handler)
c.OnError(handler)

c.OnServerPublish(handler)
c.OnServerSubscribe(handler)
c.OnServerUnsubscribe(handler)
c.OnServerJoin(handler)
c.OnServerLeave(handler)

return c
}
10 changes: 4 additions & 6 deletions model_destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@ func (m *Destination) AfterCreated(_ context.Context) error {
if m.Monitor.Valid {
monitor := m.Client().Chainstate().Monitor()
if monitor != nil {
processor := monitor.Processor()
if processor != nil {
err := processor.Add(utils.P2PKHRegexpString, m.LockingScript)
if err != nil {
m.DebugLog(fmt.Sprintf("ERROR: failed adding destination to monitor: %s", err.Error()))
}
m.DebugLog(fmt.Sprintf("Adding destination to monitor: %s", m.LockingScript))
err := monitor.Add(utils.P2PKHRegexpString, m.LockingScript)
if err != nil {
m.DebugLog(fmt.Sprintf("ERROR: failed adding destination to monitor: %s", err.Error()))
}
}
}
Expand Down
130 changes: 89 additions & 41 deletions transaction_monitor_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bux

import (
"context"
"encoding/json"
"fmt"
"runtime"

Expand All @@ -11,7 +12,8 @@ import (
"github.com/mrz1836/go-whatsonchain"
)

type transactionEventHandler struct {
// TransactionEventHandler for handling transaction events from a monitor
type TransactionEventHandler struct {
debug bool
logger chainstate.Logger
monitor chainstate.MonitorService
Expand All @@ -21,8 +23,8 @@ type transactionEventHandler struct {
}

// NewTransactionMonitorHandler create a new monitor handler
func NewTransactionMonitorHandler(ctx context.Context, buxClient ClientInterface, monitor chainstate.MonitorService) transactionEventHandler {
return transactionEventHandler{
func NewTransactionMonitorHandler(ctx context.Context, buxClient ClientInterface, monitor chainstate.MonitorService) TransactionEventHandler {
return TransactionEventHandler{
debug: monitor.IsDebug(),
logger: monitor.Logger(),
monitor: monitor,
Expand All @@ -32,76 +34,123 @@ func NewTransactionMonitorHandler(ctx context.Context, buxClient ClientInterface
}
}

func (h *transactionEventHandler) OnConnect(_ *centrifuge.Client, e centrifuge.ConnectEvent) {
ctx := context.Background()
h.logger.Info(ctx, fmt.Sprintf("[MONITOR] Connected to server: %s\n", e.ClientID))
// OnConnect event when connected
func (h *TransactionEventHandler) OnConnect(_ *centrifuge.Client, e centrifuge.ConnectEvent) {
h.logger.Info(h.ctx, fmt.Sprintf("[MONITOR] Connected to server: %s", e.ClientID))
if h.monitor.GetProcessMempoolOnConnect() {
h.logger.Info(ctx, "[MONITOR] PROCESS MEMPOOL")
h.logger.Info(h.ctx, "[MONITOR] PROCESS MEMPOOL")
go func() {
err := h.monitor.ProcessMempool(ctx)
err := h.monitor.ProcessMempool(h.ctx)
if err != nil {
h.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR processing mempool: %s", err.Error()))
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR processing mempool: %s", err.Error()))
}
}()
}
h.monitor.Connected()
}

func (h *transactionEventHandler) OnError(_ *centrifuge.Client, e centrifuge.ErrorEvent) {
ctx := context.Background()
h.logger.Error(ctx, fmt.Sprintf("[MONITOR] Error: %s", e.Message))
// OnError on error event
func (h *TransactionEventHandler) OnError(_ *centrifuge.Client, e centrifuge.ErrorEvent) {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] Error: %s", e.Message))
}

func (h *transactionEventHandler) OnMessage(_ *centrifuge.Client, e centrifuge.MessageEvent) {
// OnMessage on new message event
func (h *TransactionEventHandler) OnMessage(_ *centrifuge.Client, e centrifuge.MessageEvent) {
var data map[string]interface{}
err := json.Unmarshal(e.Data, &data)
if err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] failed unmarshalling data: %s", err.Error()))
}

if _, ok := data["time"]; !ok {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnMessage: %v", data))
}
}
}

func (h *transactionEventHandler) OnDisconnect(_ *centrifuge.Client, e centrifuge.DisconnectEvent) {
// OnDisconnect when disconnected
func (h *TransactionEventHandler) OnDisconnect(_ *centrifuge.Client, e centrifuge.DisconnectEvent) {
h.monitor.Disconnected()
}

func (h *transactionEventHandler) OnJoin(_ *centrifuge.Subscription, e centrifuge.JoinEvent) {
// OnJoin event when joining a server
func (h *TransactionEventHandler) OnJoin(_ *centrifuge.Subscription, e centrifuge.JoinEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnJoin: %v", e))
}
}

func (h *transactionEventHandler) OnLeave(_ *centrifuge.Subscription, e centrifuge.LeaveEvent) {
// OnLeave event when leaving a server
func (h *TransactionEventHandler) OnLeave(_ *centrifuge.Subscription, e centrifuge.LeaveEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnLeave: %v", e))
}
}

func (h *transactionEventHandler) OnPublish(_ *centrifuge.Subscription, e centrifuge.PublishEvent) {
// OnPublish ???
func (h *TransactionEventHandler) OnPublish(_ *centrifuge.Subscription, e centrifuge.PublishEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnPublish: %v", e))
}
}

func (h *transactionEventHandler) OnServerSubscribe(_ *centrifuge.Client, e centrifuge.ServerSubscribeEvent) {
// OnServerSubscribe ???
func (h *TransactionEventHandler) OnServerSubscribe(_ *centrifuge.Client, e centrifuge.ServerSubscribeEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnServerSubscribe: %v", e))
}
}

func (h *transactionEventHandler) OnServerUnsubscribe(_ *centrifuge.Client, e centrifuge.ServerUnsubscribeEvent) {
// OnServerUnsubscribe ???
func (h *TransactionEventHandler) OnServerUnsubscribe(_ *centrifuge.Client, e centrifuge.ServerUnsubscribeEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnServerUnsubscribe: %v", e))
}
}

func (h *transactionEventHandler) OnSubscribeSuccess(_ *centrifuge.Subscription, e centrifuge.SubscribeSuccessEvent) {
// OnSubscribeSuccess ???
func (h *TransactionEventHandler) OnSubscribeSuccess(_ *centrifuge.Subscription, e centrifuge.SubscribeSuccessEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnSubscribeSuccess: %v", e))
}
}

func (h *transactionEventHandler) OnSubscribeError(_ *centrifuge.Subscription, e centrifuge.SubscribeErrorEvent) {
// OnSubscribeError ???
func (h *TransactionEventHandler) OnSubscribeError(_ *centrifuge.Subscription, e centrifuge.SubscribeErrorEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnSubscribeError: %v", e))
}
}

func (h *transactionEventHandler) OnUnsubscribe(_ *centrifuge.Subscription, e centrifuge.UnsubscribeEvent) {
// OnUnsubscribe ???
func (h *TransactionEventHandler) OnUnsubscribe(_ *centrifuge.Subscription, e centrifuge.UnsubscribeEvent) {
if h.debug {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] OnUnsubscribe: %v", e))
}
}

func (h *transactionEventHandler) OnServerJoin(_ *centrifuge.Client, e centrifuge.ServerJoinEvent) {
ctx := context.Background()
h.logger.Info(ctx, fmt.Sprintf("[MONITOR] Joined server: %v\n", e))
// OnServerJoin event when joining a server
func (h *TransactionEventHandler) OnServerJoin(_ *centrifuge.Client, e centrifuge.ServerJoinEvent) {
h.logger.Info(h.ctx, fmt.Sprintf("[MONITOR] Joined server: %v", e))
}

func (h *transactionEventHandler) OnServerLeave(_ *centrifuge.Client, e centrifuge.ServerLeaveEvent) {
// OnServerLeave event when leaving a server
func (h *TransactionEventHandler) OnServerLeave(_ *centrifuge.Client, e centrifuge.ServerLeaveEvent) {
h.logger.Info(h.ctx, fmt.Sprintf("[MONITOR] Left server: %v", e))
}

func (h *transactionEventHandler) OnServerPublish(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
// OnServerPublish ???
func (h *TransactionEventHandler) OnServerPublish(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
// todo make this configurable
//h.OnServerPublishLinear(nil, e)
h.OnServerPublishParallel(nil, e)
//h.onServerPublishLinear(nil, e)
h.onServerPublishParallel(nil, e)
}

func (h *transactionEventHandler) OnServerPublishLinear(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
ctx := context.Background()
func (h *TransactionEventHandler) onServerPublishLinear(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
tx, err := h.monitor.Processor().FilterMempoolPublishEvent(e)
if err != nil {
h.logger.Error(ctx, fmt.Sprintf("[MONITOR] failed to process server event: %v\n", err))
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] failed to process server event: %v", err))
return
}

Expand All @@ -114,41 +163,40 @@ func (h *transactionEventHandler) OnServerPublishLinear(_ *centrifuge.Client, e
}
_, err = h.buxClient.RecordMonitoredTransaction(h.ctx, tx)
if err != nil {
h.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR recording tx: %v\n", err))
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR recording tx: %v", err))
return
}

if h.debug {
h.logger.Info(ctx, fmt.Sprintf("[MONITOR] successfully recorded tx: %v\n", tx))
h.logger.Info(h.ctx, fmt.Sprintf("[MONITOR] successfully recorded tx: %v", tx))
}
}

func (h *transactionEventHandler) OnServerPublishParallel(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
ctx := context.Background()
func (h *TransactionEventHandler) onServerPublishParallel(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
_, err := h.limit.Execute(func() {
h.OnServerPublishLinear(nil, e)
h.onServerPublishLinear(nil, e)
})

if err != nil {
h.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR failed to start goroutine: %v", err))
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR failed to start goroutine: %v", err))
}
}

// SetMonitor sets the monitor for the given handler
func (h *transactionEventHandler) SetMonitor(monitor *chainstate.Monitor) {
func (h *TransactionEventHandler) SetMonitor(monitor *chainstate.Monitor) {
h.monitor = monitor
}

// RecordTransaction records a transaction into bux
func (h *transactionEventHandler) RecordTransaction(ctx context.Context, txHex string) error {
func (h *TransactionEventHandler) RecordTransaction(ctx context.Context, txHex string) error {

_, err := h.buxClient.RecordMonitoredTransaction(ctx, txHex)

return err
}

// GetWhatsOnChain returns the whats on chain client interface
func (h *transactionEventHandler) GetWhatsOnChain() whatsonchain.ClientInterface {
func (h *TransactionEventHandler) GetWhatsOnChain() whatsonchain.ClientInterface {

return h.buxClient.Chainstate().WhatsOnChain()
}

0 comments on commit 61d99e4

Please sign in to comment.