Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat(BUX-181): remove woc from monitor module
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Sep 19, 2023
1 parent 483328d commit d700ea2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 144 deletions.
5 changes: 1 addition & 4 deletions chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ type MonitorClient interface {

// MonitorHandler interface
type MonitorHandler interface {
whatsonchain.SocketHandler
GetWhatsOnChain() whatsonchain.ClientInterface
SocketHandler
RecordBlockHeader(ctx context.Context, bh bc.BlockHeader) error
RecordTransaction(ctx context.Context, txHex string) error
SetMonitor(monitor *Monitor)
Expand Down Expand Up @@ -110,13 +109,11 @@ type MonitorService interface {
GetLockID() string
GetMaxNumberOfDestinations() int
GetMonitorDays() int
GetProcessMempoolOnConnect() bool
IsConnected() bool
IsDebug() bool
LoadMonitoredDestinations() bool
AllowUnknownTransactions() bool
Logger() Logger
ProcessMempool(ctx context.Context) error
Processor() MonitorProcessor
SaveDestinations() bool
Start(ctx context.Context, handler MonitorHandler, onStop func()) error
Expand Down
109 changes: 0 additions & 109 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package chainstate
import (
"context"
"fmt"
"time"

"github.com/BuxOrg/bux/utils"
zLogger "github.com/mrz1836/go-logger"
"github.com/mrz1836/go-whatsonchain"
)

// Monitor starts a new monitorConfig to monitor and filter transactions from a source
Expand All @@ -30,7 +28,6 @@ type Monitor struct {
mempoolSyncChannelActive bool
mempoolSyncChannel chan bool
monitorDays int
processMempoolOnConnect bool
processor MonitorProcessor
saveTransactionsDestinations bool
onStop func()
Expand All @@ -47,7 +44,6 @@ type MonitorOptions struct {
LockID string `json:"lock_id"`
MaxNumberOfDestinations int `json:"max_number_of_destinations"`
MonitorDays int `json:"monitor_days"`
ProcessMempoolOnConnect bool `json:"process_mempool_on_connect"`
ProcessorType string `json:"processor_type"`
SaveTransactionDestinations bool `json:"save_transaction_destinations"`
AllowUnknownTransactions bool `json:"allow_unknown_transactions"` // whether to allow transactions that do not have an xpub_in_id or xpub_out_id
Expand Down Expand Up @@ -97,7 +93,6 @@ func NewMonitor(_ context.Context, options *MonitorOptions) (monitor *Monitor) {
lockID: options.LockID,
maxNumberOfDestinations: options.MaxNumberOfDestinations,
monitorDays: options.MonitorDays,
processMempoolOnConnect: options.ProcessMempoolOnConnect,
saveTransactionsDestinations: options.SaveTransactionDestinations,
allowUnknownTransactions: options.AllowUnknownTransactions,
}
Expand Down Expand Up @@ -167,11 +162,6 @@ func (m *Monitor) GetMaxNumberOfDestinations() int {
return m.maxNumberOfDestinations
}

// GetProcessMempoolOnConnect gets whether the whole mempool should be processed when connecting
func (m *Monitor) GetProcessMempoolOnConnect() bool {
return m.processMempoolOnConnect
}

// IsConnected returns whether we are connected to the socket
func (m *Monitor) IsConnected() bool {
return m.connected
Expand Down Expand Up @@ -202,105 +192,6 @@ func (m *Monitor) Processor() MonitorProcessor {
return m.processor
}

// ProcessMempool processes all current transactions in the mempool
func (m *Monitor) ProcessMempool(ctx context.Context) error {

woc := m.handler.GetWhatsOnChain()
if woc != nil {
mempoolTxs, err := woc.GetMempoolTransactions(ctx)
if err != nil {
return err
}

// create a new channel to control this go routine
m.mempoolSyncChannel = make(chan bool)
m.mempoolSyncChannelActive = true
// run the processing of the txs in a different thread
go func(mempoolSyncChannel chan bool) {
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] ProcessMempool mempoolTxs: %d\n", len(mempoolTxs)))
}
if len(mempoolTxs) > 0 {
hashes := new(whatsonchain.TxHashes)
hashes.TxIDs = append(hashes.TxIDs, mempoolTxs...)

// Break up the transactions into batches
var batches [][]string
chunkSize := whatsonchain.MaxTransactionsRaw

for i := 0; i < len(hashes.TxIDs); i += chunkSize {
end := i + chunkSize
if end > len(hashes.TxIDs) {
end = len(hashes.TxIDs)
}
batches = append(batches, hashes.TxIDs[i:end])
}
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] ProcessMempool created batches: %d\n", len(batches)))
}

var currentRateLimit int
// Loop Batches - and get each batch (multiple batches of MaxTransactionsRaw)
// this code comes from the go-whatsonchain lib, but we want to process per 20
// and not the whole batch in 1 go
for i, batch := range batches {
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] ProcessMempool processing batch: %d\n", i+1))
}
// While processing all the batches, check if channel is closed
select {
case <-mempoolSyncChannel:
return
default:

txHashes := new(whatsonchain.TxHashes)
txHashes.TxIDs = append(txHashes.TxIDs, batch...)

// Get the tx details (max of MaxTransactionsUTXO)
var returnedList whatsonchain.TxList
if returnedList, err = woc.BulkRawTransactionDataProcessor(
ctx, txHashes,
); err != nil {
return
}

// Add to the list
for _, tx := range returnedList {
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] ProcessMempool tx: %s\n", tx.TxID))
}
var txHex string
txHex, err = m.processor.FilterTransaction(tx.Hex) // todo off
if err != nil {
m.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR filtering tx %s: %s\n", tx.TxID, err.Error()))
continue
}
if txHex != "" {
if err = m.handler.RecordTransaction(ctx, txHex); err != nil {
m.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR recording tx: %s\n", err.Error()))
continue
}
if m.debug {
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] successfully recorded tx: %s\n", tx.TxID))
}
}
}

// Accumulate / sleep to prevent rate limiting
currentRateLimit++
if currentRateLimit >= woc.RateLimit() {
time.Sleep(1 * time.Second)
currentRateLimit = 0
}
}
}
}
}(m.mempoolSyncChannel)
}

return nil
}

// SaveDestinations gets whether we should save destinations from transactions that pass monitor filter
func (m *Monitor) SaveDestinations() bool {
return m.saveTransactionsDestinations
Expand Down
22 changes: 20 additions & 2 deletions chainstate/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/centrifugal/centrifuge-go"
"github.com/mrz1836/go-whatsonchain"
)

// AddFilterMessage defines a new filter to be published from the client
Expand Down Expand Up @@ -84,7 +83,7 @@ func (a *AgentClient) SetFilter(regex string, bloomFilter *BloomProcessorFilter)
}

// newCentrifugeClient will create a new Centrifuge using the provided handler and default configurations
func newCentrifugeClient(wsURL string, handler whatsonchain.SocketHandler) MonitorClient {
func newCentrifugeClient(wsURL string, handler SocketHandler) MonitorClient {
c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig()) // todo: use our own defaults/custom options

c.OnConnect(handler)
Expand All @@ -99,3 +98,22 @@ func newCentrifugeClient(wsURL string, handler whatsonchain.SocketHandler) Monit

return &AgentClient{Client: c}
}

// SocketHandler is composite interface of centrifuge handlers interfaces
type SocketHandler interface {
OnConnect(*centrifuge.Client, centrifuge.ConnectEvent)
OnDisconnect(*centrifuge.Client, centrifuge.DisconnectEvent)
OnError(*centrifuge.Client, centrifuge.ErrorEvent)
OnJoin(*centrifuge.Subscription, centrifuge.JoinEvent)
OnLeave(*centrifuge.Subscription, centrifuge.LeaveEvent)
OnMessage(*centrifuge.Client, centrifuge.MessageEvent)
OnPublish(*centrifuge.Subscription, centrifuge.PublishEvent)
OnServerJoin(*centrifuge.Client, centrifuge.ServerJoinEvent)
OnServerLeave(*centrifuge.Client, centrifuge.ServerLeaveEvent)
OnServerPublish(*centrifuge.Client, centrifuge.ServerPublishEvent)
OnServerSubscribe(*centrifuge.Client, centrifuge.ServerSubscribeEvent)
OnServerUnsubscribe(*centrifuge.Client, centrifuge.ServerUnsubscribeEvent)
OnSubscribeError(*centrifuge.Subscription, centrifuge.SubscribeErrorEvent)
OnSubscribeSuccess(*centrifuge.Subscription, centrifuge.SubscribeSuccessEvent)
OnUnsubscribe(*centrifuge.Subscription, centrifuge.UnsubscribeEvent)
}
1 change: 0 additions & 1 deletion examples/client/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {
LockID: "unique-lock-id-for-multiple-servers",
MaxNumberOfDestinations: 25000,
MonitorDays: 5,
ProcessMempoolOnConnect: false,
ProcessorType: chainstate.FilterRegex,
SaveTransactionDestinations: false,
}),
Expand Down
36 changes: 8 additions & 28 deletions monitor_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/korovkin/limiter"
"github.com/libsv/go-bc"
"github.com/libsv/go-bt/v2"
"github.com/mrz1836/go-whatsonchain"
)

// MonitorEventHandler for handling transaction events from a monitor
Expand All @@ -41,7 +40,6 @@ type blockSubscriptionHandler struct {
}

func (b *blockSubscriptionHandler) OnPublish(subscription *centrifuge.Subscription, e centrifuge.PublishEvent) {

channelName := subscription.Channel()
if strings.HasPrefix(channelName, "block:sync:") {
// block subscription
Expand Down Expand Up @@ -75,7 +73,6 @@ func (b *blockSubscriptionHandler) OnPublish(subscription *centrifuge.Subscripti
}

func (b *blockSubscriptionHandler) OnUnsubscribe(subscription *centrifuge.Subscription, _ centrifuge.UnsubscribeEvent) {

b.logger.Info(b.ctx, fmt.Sprintf("[MONITOR] OnUnsubscribe: %s", subscription.Channel()))

// close wait group
Expand Down Expand Up @@ -112,16 +109,6 @@ func (h *MonitorEventHandler) OnConnect(client *centrifuge.Client, e centrifuge.
}
}

if h.monitor.GetProcessMempoolOnConnect() {
h.logger.Info(h.ctx, "[MONITOR] PROCESS MEMPOOL")
go func() {
ctx := context.Background()
if err := h.monitor.ProcessMempool(ctx); err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR processing mempool: %s", err.Error()))
}
}()
}

h.logger.Info(h.ctx, "[MONITOR] PROCESS BLOCK HEADERS")
if err := h.ProcessBlockHeaders(h.ctx, client); err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR processing block headers: %s", err.Error()))
Expand Down Expand Up @@ -203,7 +190,6 @@ func (h *MonitorEventHandler) ProcessBlocks(ctx context.Context, client *centrif

// ProcessBlockHeaders processes all missing block headers
func (h *MonitorEventHandler) ProcessBlockHeaders(ctx context.Context, client *centrifuge.Client) error {

lastBlockHeader, err := h.buxClient.GetLastBlockHeader(ctx)
if err != nil {
h.logger.Error(h.ctx, err.Error())
Expand Down Expand Up @@ -278,26 +264,24 @@ func (h *MonitorEventHandler) OnLeave(_ *centrifuge.Subscription, e centrifuge.L
// OnPublish on publish event
func (h *MonitorEventHandler) OnPublish(subscription *centrifuge.Subscription, e centrifuge.PublishEvent) {
channelName := subscription.Channel()

if strings.HasPrefix(channelName, "block:headers:history:") {
bi := whatsonchain.BlockInfo{}
bi := chainstate.BlockInfo{}
err := json.Unmarshal(e.Data, &bi)
if err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR unmarshalling block header: %v", err))
return
}

var existingBlock *BlockHeader
if existingBlock, err = h.buxClient.GetBlockHeaderByHeight(
h.ctx, uint32(bi.Height),
); err != nil {
if existingBlock, err = h.buxClient.GetBlockHeaderByHeight(h.ctx, uint32(bi.Height)); err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR getting block header by height: %v", err))
}

if existingBlock == nil {
if err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR unmarshalling block header: %v", err))
return
}
merkleRoot, _ := hex.DecodeString(bi.MerkleRoot)
previousBlockHash, _ := hex.DecodeString(bi.PreviousBlockHash)

bh := bc.BlockHeader{
Bits: []byte(bi.Bits),
HashMerkleRoot: merkleRoot,
Expand All @@ -306,6 +290,7 @@ func (h *MonitorEventHandler) OnPublish(subscription *centrifuge.Subscription, e
Time: uint32(bi.Time),
Version: uint32(bi.Version),
}

if _, err = h.buxClient.RecordBlockHeader(
h.ctx, bi.Hash, uint32(bi.Height), bh,
); err != nil {
Expand Down Expand Up @@ -398,7 +383,7 @@ func (h *MonitorEventHandler) processMempoolPublish(_ *centrifuge.Client, e cent
}

func (h *MonitorEventHandler) processBlockHeaderPublish(client *centrifuge.Client, e centrifuge.ServerPublishEvent) {
bi := whatsonchain.BlockInfo{}
bi := chainstate.BlockInfo{}
err := json.Unmarshal(e.Data, &bi)
if err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR unmarshalling block header: %v", err))
Expand Down Expand Up @@ -473,8 +458,3 @@ func (h *MonitorEventHandler) RecordTransaction(ctx context.Context, txHex strin
func (h *MonitorEventHandler) RecordBlockHeader(_ context.Context, _ bc.BlockHeader) error {
return nil
}

// GetWhatsOnChain returns the WhatsOnChain client interface
func (h *MonitorEventHandler) GetWhatsOnChain() whatsonchain.ClientInterface {
return h.buxClient.Chainstate().WhatsOnChain()
}

0 comments on commit d700ea2

Please sign in to comment.