Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Added missing ctx, making filter types public
Browse files Browse the repository at this point in the history
  • Loading branch information
mrz1836 committed Jun 1, 2022
1 parent ef1d3fa commit b66d667
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 21 deletions.
4 changes: 2 additions & 2 deletions chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ type MonitorService interface {
Add(regexpString string, item string) error
Processor() MonitorProcessor
ProcessMempool(ctx context.Context) error
Start(handler MonitorHandler) error
Stop() error
Start(ctx context.Context, handler MonitorHandler) error
Stop(ctx context.Context) error
}
35 changes: 17 additions & 18 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chainstate

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -11,7 +10,7 @@ import (
"github.com/mrz1836/go-whatsonchain"
)

// Monitor starts a new monitorConfig to monitorConfig and filter transactions from a source
// Monitor starts a new monitorConfig to monitor and filter transactions from a source
type Monitor struct {
authToken string
buxAgentURL string
Expand All @@ -25,11 +24,11 @@ type Monitor struct {
loadMonitoredDestinations bool
logger logger.Interface
maxNumberOfDestinations int
mempoolSyncChannel chan bool
monitorDays int
processMempoolOnConnect bool
processor MonitorProcessor
saveTransactionsDestinations bool
mempoolSyncChannel chan bool
}

// MonitorOptions options for starting this monitorConfig
Expand Down Expand Up @@ -62,30 +61,30 @@ func (o *MonitorOptions) checkDefaults() {
func NewMonitor(_ context.Context, options *MonitorOptions) (monitor *Monitor) {
options.checkDefaults()
if options.ProcessorType == "" {
options.ProcessorType = filterBloom
options.ProcessorType = FilterBloom
}
monitor = &Monitor{
authToken: options.AuthToken,
buxAgentURL: options.BuxAgentURL,
debug: options.Debug,
falsePositiveRate: options.FalsePositiveRate,
filterType: options.ProcessorType,
maxNumberOfDestinations: options.MaxNumberOfDestinations,
mempoolSyncChannel: make(chan bool),
monitorDays: options.MonitorDays,
processMempoolOnConnect: options.ProcessMempoolOnConnect,
saveTransactionsDestinations: options.SaveTransactionDestinations,
filterType: options.ProcessorType,
mempoolSyncChannel: make(chan bool),
}

// Set logger if not set
if monitor.logger == nil {
monitor.logger = logger.NewLogger(true, 4)
monitor.logger = logger.NewLogger(options.Debug, 4)
}

switch monitor.filterType {
case filterRegex:
case FilterRegex:
monitor.processor = NewRegexProcessor()
case filterBloom:
case FilterBloom:
default:
monitor.processor = NewBloomProcessor(uint(monitor.maxNumberOfDestinations), monitor.falsePositiveRate)
}
Expand Down Expand Up @@ -145,12 +144,12 @@ func (m *Monitor) SetChainstateOptions(options *clientOptions) {
m.chainstateOptions = options
}

// Monitor open a socket to the service provider and monitorConfig transactions
func (m *Monitor) Start(handler MonitorHandler) error {
// Start open a socket to the service provider and monitorConfig transactions
func (m *Monitor) Start(ctx context.Context, handler MonitorHandler) error {
if m.client == nil {
handler.SetMonitor(m)
m.handler = handler
m.logger.Info(context.Background(), fmt.Sprintf("[MONITOR] Starting, connecting to server: %s", m.buxAgentURL))
m.logger.Info(ctx, fmt.Sprintf("[MONITOR] Starting, connecting to server: %s", m.buxAgentURL))
m.client = newCentrifugeClient(m.buxAgentURL, handler)
if m.authToken != "" {
m.client.SetToken(m.authToken)
Expand All @@ -163,7 +162,7 @@ func (m *Monitor) Start(handler MonitorHandler) error {
// Add a new item to monitor
func (m *Monitor) Add(regexString, item string) error {
if m.processor == nil {
return errors.New("monitor processor not available")
return ErrMonitorNotAvailable
}
// todo signal to bux-agent that a new item was added
_, err := m.client.AddFilter(regexString, item)
Expand All @@ -188,9 +187,9 @@ func (m *Monitor) IsConnected() bool {
return m.connected
}

// PauseMonitor closes the monitoring socket and pauses monitoring
func (m *Monitor) Stop() error {
m.logger.Info(context.Background(), "[MONITOR] Stopping monitor...")
// Stop closes the monitoring socket and pauses monitoring
func (m *Monitor) Stop(ctx context.Context) error {
m.logger.Info(ctx, "[MONITOR] Stopping monitor...")
defer close(m.mempoolSyncChannel)
return m.client.Disconnect()
}
Expand All @@ -208,6 +207,7 @@ func (m *Monitor) ProcessMempool(ctx context.Context) error {
// TODO: This is overkill right now, but gives us a chance to parallelize this stuff
var done sync.WaitGroup
done.Add(1)

// run the processing of the txs in a different thread
go func() {
if m.debug {
Expand Down Expand Up @@ -269,8 +269,7 @@ func (m *Monitor) ProcessMempool(ctx context.Context) error {
continue
}
if txHex != "" {
err = m.handler.RecordTransaction(ctx, txHex)
if err != nil {
if err = m.handler.RecordTransaction(ctx, txHex); err != nil {
m.logger.Error(ctx, fmt.Sprintf("[MONITOR] ERROR recording tx: %s\n", err.Error()))
continue
}
Expand Down
2 changes: 1 addition & 1 deletion client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *Client) loadMonitor(ctx context.Context) (err error) {
}
}

return monitor.Start(&handler)
return monitor.Start(ctx, &handler)
}

// runModelMigrations will run the model Migrate() method for all models
Expand Down

0 comments on commit b66d667

Please sign in to comment.