diff --git a/.vscode/settings.json b/.vscode/settings.json index edf3fd5f..c21ec6f2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,5 +19,10 @@ "go.lintTool": "golangci-lint", "go.lintFlags": [ "--fast" - ] -} \ No newline at end of file + ], + "[go][go.mod]": { + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit" + } + } +} diff --git a/action_destination.go b/action_destination.go index ee2d14a3..4c656dda 100644 --- a/action_destination.go +++ b/action_destination.go @@ -2,18 +2,14 @@ package bux import ( "context" - "database/sql" - "time" - "github.com/mrz1836/go-datastore" - customTypes "github.com/mrz1836/go-datastore/custom_types" ) // NewDestination will get a new destination for an existing xPub // // xPubKey is the raw public xPub func (c *Client) NewDestination(ctx context.Context, xPubKey string, chain uint32, - destinationType string, monitor bool, opts ...ModelOps) (*Destination, error) { + destinationType string, opts ...ModelOps) (*Destination, error) { // Check for existing NewRelic transaction ctx = c.GetOrStartTxn(ctx, "new_destination") @@ -39,13 +35,6 @@ func (c *Client) NewDestination(ctx context.Context, xPubKey string, chain uint3 return nil, err } - if monitor { - destination.Monitor = customTypes.NullTime{NullTime: sql.NullTime{ - Valid: true, - Time: time.Now(), - }} - } - // Save the destination if err = destination.Save(ctx); err != nil { return nil, err @@ -57,7 +46,7 @@ func (c *Client) NewDestination(ctx context.Context, xPubKey string, chain uint3 // NewDestinationForLockingScript will create a new destination based on a locking script func (c *Client) NewDestinationForLockingScript(ctx context.Context, xPubID, lockingScript string, - monitor bool, opts ...ModelOps) (*Destination, error) { + opts ...ModelOps) (*Destination, error) { // Check for existing NewRelic transaction ctx = c.GetOrStartTxn(ctx, "new_destination_for_locking_script") @@ -77,6 +66,7 @@ func (c *Client) NewDestinationForLockingScript(ctx context.Context, xPubID, loc return nil, ErrUnknownLockingScript } +<<<<<<< HEAD // set the monitoring, passed down from the initiating function // this will be set when calling NewDestination from http, but not for instance paymail if monitor { @@ -86,6 +76,8 @@ func (c *Client) NewDestinationForLockingScript(ctx context.Context, xPubID, loc }} } +======= +>>>>>>> 06feaba (feat(BUX-417): remove monitor, ITC flag and IncomingTransaction (#532)) // Save the destination if err := destination.Save(ctx); err != nil { return nil, err diff --git a/action_destination_test.go b/action_destination_test.go index 72483cab..9814c01f 100644 --- a/action_destination_test.go +++ b/action_destination_test.go @@ -30,7 +30,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_NewDestination() { var destination *Destination destination, err = tc.client.NewDestination( - ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, opts..., + ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) assert.NoError(t, err) assert.Equal(t, "fc1e635d98151c6008f29908ee2928c60c745266f9853e945c917b1baa05973e", destination.ID) @@ -42,7 +42,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_NewDestination() { assert.Equal(t, "test-value", destination.Metadata["test-key"]) destination2, err2 := tc.client.NewDestination( - ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, opts..., + ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) assert.NoError(t, err2) assert.Equal(t, testXPubID, destination2.XpubID) @@ -65,7 +65,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_NewDestination() { destination, err := tc.client.NewDestination( context.Background(), testXPub, utils.ChainExternal, - utils.ScriptTypePubKeyHash, false, opts..., + utils.ScriptTypePubKeyHash, opts..., ) require.Error(t, err) require.Nil(t, destination) @@ -99,7 +99,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_NewDestinationForLockingScript() { var destination *Destination destination, err = tc.client.NewDestinationForLockingScript( - tc.ctx, testXPubID, lockingScript, false, opts..., + tc.ctx, testXPubID, lockingScript, opts..., ) assert.NoError(t, err) assert.Equal(t, "a64c7aca7110c7cde92245252a58bb18a4317381fc31fc293f6aafa3fcc7019f", destination.ID) @@ -118,7 +118,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_NewDestinationForLockingScript() { opts := append(tc.client.DefaultModelOptions(), WithMetadatas(metadata)) destination, err := tc.client.NewDestinationForLockingScript( - tc.ctx, testXPubID, "", false, + tc.ctx, testXPubID, "", opts..., ) require.Error(t, err) @@ -147,7 +147,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinations() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -176,7 +176,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinations() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -212,7 +212,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinationByAddress() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -240,7 +240,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinationByAddress() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -276,7 +276,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinationByLockingScript() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -305,7 +305,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_GetDestinationByLockingScript() { // Create a new destination destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) @@ -340,7 +340,7 @@ func (ts *EmbeddedDBTestSuite) TestClient_UpdateDestinationMetadata() { opts := tc.client.DefaultModelOptions() opts = append(opts, WithMetadatas(metadata)) destination, err := tc.client.NewDestination( - tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + tc.ctx, rawKey, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) diff --git a/action_transaction.go b/action_transaction.go index 2186f54e..f6123d7e 100644 --- a/action_transaction.go +++ b/action_transaction.go @@ -46,13 +46,7 @@ func (c *Client) RecordRawTransaction(ctx context.Context, txHex string, ) (*Transaction, error) { ctx = c.GetOrStartTxn(ctx, "record_raw_transaction") - allowUnknown := true - monitor := c.options.chainstate.Monitor() - if monitor != nil { - allowUnknown = monitor.AllowUnknownTransactions() - } - - return saveRawTransaction(ctx, c, allowUnknown, txHex, opts...) + return saveRawTransaction(ctx, c, true, txHex, opts...) } // NewTransaction will create a new draft transaction and return it diff --git a/bux_suite_test.go b/bux_suite_test.go index aac0aaf9..543d6baa 100644 --- a/bux_suite_test.go +++ b/bux_suite_test.go @@ -3,12 +3,12 @@ package bux import ( "context" "fmt" + "github.com/rs/zerolog" "sync" "testing" "time" "github.com/BuxOrg/bux/chainstate" - "github.com/BuxOrg/bux/logging" "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/tester" "github.com/DATA-DOG/go-sqlmock" @@ -68,7 +68,7 @@ type EmbeddedDBTestSuite struct { func (ts *EmbeddedDBTestSuite) serveMySQL() { defer ts.wg.Done() - logger := logging.GetDefaultLogger() + logger := zerolog.Nop() for { err := ts.MySQLServer.Start() diff --git a/chainstate/chainstate.go b/chainstate/chainstate.go index 1683cf04..6ba7243d 100644 --- a/chainstate/chainstate.go +++ b/chainstate/chainstate.go @@ -9,11 +9,6 @@ import ( "time" ) -// MonitorBlockHeaders will start up a block headers monitor -func (c *Client) MonitorBlockHeaders(_ context.Context) error { - return nil -} - // Broadcast will attempt to broadcast a transaction using the given providers func (c *Client) Broadcast(ctx context.Context, id, txHex string, timeout time.Duration) (string, error) { // Basic validation diff --git a/chainstate/client.go b/chainstate/client.go index db6f81bb..16a88f34 100644 --- a/chainstate/client.go +++ b/chainstate/client.go @@ -26,7 +26,6 @@ type ( config *syncConfig // Configuration for broadcasting and other chain-state actions debug bool // For extra logs and additional debug information logger *zerolog.Logger // Logger interface - monitor MonitorService // Monitor service newRelicEnabled bool // If NewRelic is enabled (parent application) userAgent string // Custom user agent for outgoing HTTP Requests } @@ -100,11 +99,6 @@ func (c *Client) Close(ctx context.Context) { c.options.config.minercraft = nil } - // Stop the active Monitor (if not already stopped) - if c.options.monitor != nil { - _ = c.options.monitor.Stop(ctx) - c.options.monitor = nil - } } } @@ -143,11 +137,6 @@ func (c *Client) Minercraft() minercraft.ClientInterface { return c.options.config.minercraft } -// Monitor will return the Monitor client -func (c *Client) Monitor() MonitorService { - return c.options.monitor -} - // BroadcastClient will return the BroadcastClient client func (c *Client) BroadcastClient() broadcast.Client { return c.options.config.broadcastClient diff --git a/chainstate/client_options.go b/chainstate/client_options.go index b9dd62e0..262d8122 100644 --- a/chainstate/client_options.go +++ b/chainstate/client_options.go @@ -122,25 +122,6 @@ func WithLogger(customLogger *zerolog.Logger) ClientOps { } } -// WithMonitoring will create a new monitorConfig interface with the given options -func WithMonitoring(ctx context.Context, monitorOptions *MonitorOptions) ClientOps { - return func(c *clientOptions) { - if monitorOptions != nil { - // Create the default Monitor for monitoring destinations - c.monitor = NewMonitor(ctx, monitorOptions) - } - } -} - -// WithMonitoringInterface will set the interface to use for monitoring the blockchain -func WithMonitoringInterface(monitor MonitorService) ClientOps { - return func(c *clientOptions) { - if monitor != nil { - c.monitor = monitor - } - } -} - // WithExcludedProviders will set a list of excluded providers func WithExcludedProviders(providers []string) ClientOps { return func(c *clientOptions) { diff --git a/chainstate/definitions.go b/chainstate/definitions.go index 25d180eb..1579b4ef 100644 --- a/chainstate/definitions.go +++ b/chainstate/definitions.go @@ -10,7 +10,6 @@ const ( defaultFalsePositiveRate = 0.01 defaultFeeLastCheckIgnore = 2 * time.Minute defaultMaxNumberOfDestinations = 100000 - defaultMonitorDays = 7 defaultQueryTimeOut = 15 * time.Second whatsOnChainRateLimitWithKey = 20 ) diff --git a/chainstate/errors.go b/chainstate/errors.go index f52b7764..070d9e0a 100644 --- a/chainstate/errors.go +++ b/chainstate/errors.go @@ -22,6 +22,3 @@ var ErrMissingBroadcastMiners = errors.New("missing: broadcasting miners") // ErrMissingQueryMiners is when query miners are missing var ErrMissingQueryMiners = errors.New("missing: query miners") - -// ErrMonitorNotAvailable is when the monitor processor is not available -var ErrMonitorNotAvailable = errors.New("monitor processor not available") diff --git a/chainstate/interface.go b/chainstate/interface.go index 555fec4a..d64787d4 100644 --- a/chainstate/interface.go +++ b/chainstate/interface.go @@ -8,8 +8,6 @@ import ( "github.com/BuxOrg/bux/utils" "github.com/bitcoin-sv/go-broadcast-client/broadcast" "github.com/centrifugal/centrifuge-go" - "github.com/libsv/go-bc" - "github.com/rs/zerolog" "github.com/tonicpow/go-minercraft/v2" ) @@ -51,64 +49,11 @@ type ClientInterface interface { HTTPClient() HTTPInterface IsDebug() bool IsNewRelicEnabled() bool - Monitor() MonitorService Network() Network QueryTimeout() time.Duration FeeUnit() *utils.FeeUnit } -// MonitorClient interface -type MonitorClient interface { - AddFilter(regex, item string) (centrifuge.PublishResult, error) - Connect() error - Disconnect() error - SetToken(token string) -} - -// MonitorHandler interface -type MonitorHandler interface { - SocketHandler - RecordBlockHeader(ctx context.Context, bh bc.BlockHeader) error - RecordTransaction(ctx context.Context, txHex string) error - SetMonitor(monitor *Monitor) -} - -// MonitorProcessor struct that defines interface to all filter processors -type MonitorProcessor interface { - Add(regexString, item string) error - Debug(bool) - FilterTransaction(txHex string) (string, error) - FilterTransactionPublishEvent(eData []byte) (string, error) - GetFilters() map[string]*BloomProcessorFilter - GetHash() string - IsDebug() bool - Logger() *zerolog.Logger - Reload(regexString string, items []string) error - SetFilter(regex string, filter []byte) error - SetLogger(logger *zerolog.Logger) - Test(regexString string, item string) bool -} - -// MonitorService for the monitoring -type MonitorService interface { - Add(regexpString string, item string) error - Connected() - Disconnected() - GetFalsePositiveRate() float64 - GetLockID() string - GetMaxNumberOfDestinations() int - GetMonitorDays() int - IsConnected() bool - IsDebug() bool - LoadMonitoredDestinations() bool - AllowUnknownTransactions() bool - Logger() *zerolog.Logger - Processor() MonitorProcessor - SaveDestinations() bool - Start(ctx context.Context, handler MonitorHandler, onStop func()) error - Stop(ctx context.Context) error -} - // SocketHandler is composite interface of centrifuge handlers interfaces type SocketHandler interface { OnConnect(*centrifuge.Client, centrifuge.ConnectEvent) diff --git a/chainstate/monitor.go b/chainstate/monitor.go deleted file mode 100644 index cc09f188..00000000 --- a/chainstate/monitor.go +++ /dev/null @@ -1,238 +0,0 @@ -package chainstate - -import ( - "context" - - "github.com/BuxOrg/bux/logging" - "github.com/BuxOrg/bux/utils" - "github.com/rs/zerolog" -) - -// Monitor starts a new monitorConfig to monitor and filter transactions from a source -// -// Internal struct with all options being private -type Monitor struct { - authToken string - buxAgentURL string - chainstateOptions *clientOptions - client MonitorClient - connected bool - debug bool - falsePositiveRate float64 - filterType string - handler MonitorHandler - loadMonitoredDestinations bool - lockID string - logger *zerolog.Logger - maxNumberOfDestinations int - mempoolSyncChannelActive bool - mempoolSyncChannel chan bool - monitorDays int - processor MonitorProcessor - saveTransactionsDestinations bool - onStop func() - allowUnknownTransactions bool -} - -// MonitorOptions options for starting this monitorConfig -type MonitorOptions struct { - AuthToken string `json:"token"` - BuxAgentURL string `json:"bux_agent_url"` - Debug bool `json:"debug"` - FalsePositiveRate float64 `json:"false_positive_rate"` - LoadMonitoredDestinations bool `json:"load_monitored_destinations"` - LockID string `json:"lock_id"` - MaxNumberOfDestinations int `json:"max_number_of_destinations"` - MonitorDays int `json:"monitor_days"` - ProcessorType string `json:"processor_type"` - SaveTransactionDestinations bool `json:"save_transaction_destinations"` - AllowUnknownTransactions bool `json:"allow_unknown_transactions"` // whether to allow transactions that do not have an xpub_in_id or xpub_out_id -} - -// checkDefaults will check for missing values and set default values -func (o *MonitorOptions) checkDefaults() { - // Set the default for Monitor Days (days in past) - if o.MonitorDays <= 0 { - o.MonitorDays = defaultMonitorDays - } - - // Set the false positive rate - if o.FalsePositiveRate <= 0 { - o.FalsePositiveRate = defaultFalsePositiveRate - } - - // Set the maximum number of destinations to monitor - if o.MaxNumberOfDestinations <= 0 { - o.MaxNumberOfDestinations = defaultMaxNumberOfDestinations - } - - // Set a unique lock id if it's not provided - if len(o.LockID) == 0 { // todo: lockID should always be set (return an error if not set?) - o.LockID, _ = utils.RandomHex(32) - } -} - -// NewMonitor starts a new monitorConfig and loads all addresses that need to be monitored into the bloom filter -func NewMonitor(_ context.Context, options *MonitorOptions) (monitor *Monitor) { - // Check the defaults - options.checkDefaults() - - // Set the default processor type if not recognized - if options.ProcessorType != FilterBloom && options.ProcessorType != FilterRegex { - options.ProcessorType = FilterBloom - } - - // Create a monitor struct - monitor = &Monitor{ - authToken: options.AuthToken, - buxAgentURL: options.BuxAgentURL, - debug: options.Debug, - falsePositiveRate: options.FalsePositiveRate, - filterType: options.ProcessorType, - loadMonitoredDestinations: options.LoadMonitoredDestinations, - lockID: options.LockID, - maxNumberOfDestinations: options.MaxNumberOfDestinations, - monitorDays: options.MonitorDays, - saveTransactionsDestinations: options.SaveTransactionDestinations, - allowUnknownTransactions: options.AllowUnknownTransactions, - } - - // Set logger if not set - if monitor.logger == nil { - monitor.logger = logging.GetDefaultLogger() - } - - // Switch on the filter type - switch monitor.filterType { - case FilterRegex: - monitor.processor = NewRegexProcessor() - default: - monitor.processor = NewBloomProcessor(uint(monitor.maxNumberOfDestinations), monitor.falsePositiveRate) - } - - // Load the settings for debugging and logging - monitor.processor.Debug(options.Debug) - monitor.processor.SetLogger(monitor.logger) - return -} - -// Add a new item to monitor -func (m *Monitor) Add(regexString, item string) error { - if m.processor == nil { - return ErrMonitorNotAvailable - } - // todo signal to bux-agent that a new item was added - if m.client != nil { - if _, err := m.client.AddFilter(regexString, item); err != nil { - return err - } - } else { - m.logger.Error().Msg("client was expected but not found") - } - return m.processor.Add(regexString, item) -} - -// Connected sets the connected state to true -func (m *Monitor) Connected() { - m.connected = true -} - -// Disconnected sets the connected state to false -func (m *Monitor) Disconnected() { - m.connected = false -} - -// GetMonitorDays gets the monitorDays option -func (m *Monitor) GetMonitorDays() int { - return m.monitorDays -} - -// GetFalsePositiveRate gets the falsePositiveRate option -func (m *Monitor) GetFalsePositiveRate() float64 { - return m.falsePositiveRate -} - -// GetLockID gets the lock id from the Monitor -func (m *Monitor) GetLockID() string { - return m.lockID -} - -// GetMaxNumberOfDestinations gets the monitorDays option -func (m *Monitor) GetMaxNumberOfDestinations() int { - return m.maxNumberOfDestinations -} - -// IsConnected returns whether we are connected to the socket -func (m *Monitor) IsConnected() bool { - return m.connected -} - -// IsDebug gets whether debugging is on -func (m *Monitor) IsDebug() bool { - return m.debug -} - -// LoadMonitoredDestinations gets where we want to add the monitored destinations from the database into the processor -func (m *Monitor) LoadMonitoredDestinations() bool { - return m.loadMonitoredDestinations -} - -// AllowUnknownTransactions gets whether we allow recording transactions with no relation to our xpubs -func (m *Monitor) AllowUnknownTransactions() bool { - return m.allowUnknownTransactions -} - -// Logger gets the current logger -func (m *Monitor) Logger() *zerolog.Logger { - return m.logger -} - -// Processor gets the monitor processor -func (m *Monitor) Processor() MonitorProcessor { - return m.processor -} - -// SaveDestinations gets whether we should save destinations from transactions that pass monitor filter -func (m *Monitor) SaveDestinations() bool { - return m.saveTransactionsDestinations -} - -// SetChainstateOptions sets the chainstate options on the monitor to allow more syncing capabilities -func (m *Monitor) SetChainstateOptions(options *clientOptions) { - m.chainstateOptions = options -} - -// Start open a socket to the service provider and monitorConfig transactions -func (m *Monitor) Start(_ context.Context, handler MonitorHandler, onStop func()) error { - if m.client == nil { - handler.SetMonitor(m) - m.handler = handler - m.logger.Info().Msgf("[MONITOR] Starting, connecting to server: %s", m.buxAgentURL) - m.client = newCentrifugeClient(m.buxAgentURL, handler) - if m.authToken != "" { - m.client.SetToken(m.authToken) - } - } - - m.onStop = onStop - - return m.client.Connect() -} - -// Stop closes the monitoring socket and pauses monitoring -func (m *Monitor) Stop(_ context.Context) error { - m.logger.Info().Msg("[MONITOR] Stopping monitor...") - if m.IsConnected() { // Only close if still connected - if m.mempoolSyncChannelActive { - close(m.mempoolSyncChannel) - m.mempoolSyncChannelActive = false - } - return m.client.Disconnect() - } - - if m.onStop != nil { - m.onStop() - } - - return nil -} diff --git a/chainstate/monitor_client.go b/chainstate/monitor_client.go deleted file mode 100644 index 981fb499..00000000 --- a/chainstate/monitor_client.go +++ /dev/null @@ -1,100 +0,0 @@ -package chainstate - -import ( - "bytes" - "encoding/json" - "time" - - "github.com/centrifugal/centrifuge-go" -) - -// AddFilterMessage defines a new filter to be published from the client -// todo Just rely on the agent for this data type -type AddFilterMessage struct { - Filter string `json:"filter"` - Hash string `json:"hash"` - Regex string `json:"regex"` - Timestamp int64 `json:"timestamp"` -} - -// SetFilterMessage defines a new filter message with a list of filters -type SetFilterMessage struct { - Filter []byte `json:"filter"` - Hash string `json:"hash"` - Regex string `json:"regex"` - Timestamp int64 `json:"timestamp"` -} - -// AgentClient implements MonitorClient with needed agent methods -type AgentClient struct { - *centrifuge.Client - Token string -} - -// Connect establishes connection to agent -func (a *AgentClient) Connect() error { - return a.Client.Connect() -} - -// Disconnect closes connection to agent -func (a *AgentClient) Disconnect() error { - return a.Client.Disconnect() -} - -// SetToken set the client token -func (a *AgentClient) SetToken(token string) { - a.Client.SetToken(token) -} - -// AddFilter adds a new filter to the agent -func (a *AgentClient) AddFilter(regex, item string) (centrifuge.PublishResult, error) { - msg := AddFilterMessage{ - Regex: regex, - Filter: item, - Timestamp: time.Now().Unix(), - } - data, err := json.Marshal(msg) - if err != nil { - return centrifuge.PublishResult{}, err - } - return a.Client.Publish("add_filter", data) -} - -// SetFilter (re)sets a filter to the agent -func (a *AgentClient) SetFilter(regex string, bloomFilter *BloomProcessorFilter) (centrifuge.PublishResult, error) { - filter := new(bytes.Buffer) - _, err := bloomFilter.Filter.WriteTo(filter) - if err != nil { - return centrifuge.PublishResult{}, err - } - - msg := SetFilterMessage{ - Regex: regex, - Filter: filter.Bytes(), - Timestamp: time.Now().Unix(), - } - - var data []byte - data, err = json.Marshal(msg) - if err != nil { - return centrifuge.PublishResult{}, err - } - return a.Client.Publish("set_filter", data) -} - -// newCentrifugeClient will create a new Centrifuge using the provided handler and default configurations -func newCentrifugeClient(wsURL string, handler SocketHandler) MonitorClient { - c := centrifuge.NewJsonClient(wsURL, centrifuge.DefaultConfig()) // todo: use our own defaults/custom options - - c.OnConnect(handler) - c.OnDisconnect(handler) - c.OnError(handler) - c.OnMessage(handler) - c.OnServerJoin(handler) - c.OnServerLeave(handler) - c.OnServerPublish(handler) - c.OnServerSubscribe(handler) - c.OnServerUnsubscribe(handler) - - return &AgentClient{Client: c} -} diff --git a/client.go b/client.go index 5e821645..1da30707 100644 --- a/client.go +++ b/client.go @@ -2,7 +2,6 @@ package bux import ( "context" - "fmt" "time" "github.com/BuxOrg/bux/chainstate" @@ -35,7 +34,6 @@ type ( encryptionKey string // Encryption key for encrypting sensitive information (IE: paymail xPub) (hex encoded key) httpClient HTTPInterface // HTTP interface to use importBlockHeadersURL string // The URL of the block headers zip file to import old block headers on startup. if block 0 is found in the DB, block headers will mpt be downloaded - itc bool // (Incoming Transactions Check) True will check incoming transactions via Miners (real-world) iuc bool // (Input UTXO Check) True will check input utxos when saving transactions logger *zerolog.Logger // Internal logging models *modelOptions // Configuration options for the loaded models @@ -188,13 +186,6 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error) return nil, err } - // Load the blockchain monitor - if client.options.chainstate.Monitor() != nil { - if err = client.loadMonitor(ctx); err != nil { - return nil, err - } - } - // Default paymail server config (generic capabilities and domain check disabled) if client.options.paymail.serverConfig.Configuration == nil { if err = client.loadDefaultPaymailConfig(); err != nil { @@ -267,19 +258,6 @@ func (c *Client) Close(ctx context.Context) error { defer txn.StartSegment("close_all").End() } - // If we loaded a Monitor, remove the long-lasting lock-key before closing cachestore - cs := c.Cachestore() - m := c.Chainstate().Monitor() - if m != nil && cs != nil && len(m.GetLockID()) > 0 { - _ = cs.Delete(ctx, fmt.Sprintf(lockKeyMonitorLockID, m.GetLockID())) - } - - // Close Cachestore - if cs != nil { - cs.Close(ctx) - c.options.cacheStore.ClientInterface = nil - } - // Close Chainstate ch := c.Chainstate() if ch != nil { @@ -395,16 +373,6 @@ func (c *Client) IsNewRelicEnabled() bool { return c.options.newRelic.enabled } -// IsMempoolMonitoringEnabled will return whether mempool monitoring is on -func (c *Client) IsMempoolMonitoringEnabled() bool { - return c.options.chainstate.IsNewRelicEnabled() -} - -// IsITCEnabled will return the flag (bool) -func (c *Client) IsITCEnabled() bool { - return c.options.itc -} - // IsIUCEnabled will return the flag (bool) func (c *Client) IsIUCEnabled() bool { return c.options.iuc diff --git a/client_internal.go b/client_internal.go index 53eb2ca3..e7ed26ce 100644 --- a/client_internal.go +++ b/client_internal.go @@ -2,8 +2,6 @@ package bux import ( "context" - "time" - "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/cluster" "github.com/BuxOrg/bux/notifications" @@ -122,61 +120,6 @@ func (c *Client) loadTaskmanager(ctx context.Context) (err error) { return } -// loadMonitor will load the default Monitor -// -// Cachestore is required to be loaded before this method is called -func (c *Client) loadMonitor(ctx context.Context) (err error) { - // Check if the monitor was set by the user - monitor := c.options.chainstate.Monitor() - if monitor == nil { - return // No monitor, exit! - } - - // Create a handler and load destinations if option has been set - handler := NewMonitorHandler(ctx, c, monitor) - - // Start the default monitor - if err = startDefaultMonitor(ctx, c, monitor); err != nil { - return err - } - - lockKey := c.options.cluster.GetClusterPrefix() + lockKeyMonitorLockID - lockID := monitor.GetLockID() - go func() { - var currentLock string - for { - if currentLock, err = c.Cachestore().WriteLockWithSecret(ctx, lockKey, lockID, defaultMonitorLockTTL); err != nil { - // do nothing really, we just didn't get the lock - if monitor.IsDebug() { - monitor.Logger().Info().Msgf("[MONITOR] failed getting lock for monitor: %s: %e", lockID, err) - } - } - - if lockID == currentLock { - // Start the monitor, if not connected - if !monitor.IsConnected() { - if err = monitor.Start(ctx, &handler, func() { - _, err = c.Cachestore().ReleaseLock(ctx, lockKeyMonitorLockID, lockID) - }); err != nil { - monitor.Logger().Error().Msgf("[MONITOR] failed starting monitor: %e", err) - } - } - } else { - // first close any monitor if running - if monitor.IsConnected() { - if err = monitor.Stop(ctx); err != nil { - monitor.Logger().Error().Msgf("[MONITOR] failed stopping monitor: %e", err) - } - } - } - - time.Sleep(defaultMonitorSleep) - } - }() - - return nil -} - // runModelMigrations will run the model Migrate() method for all models func (c *Client) runModelMigrations(models ...interface{}) (err error) { // If the migrations are disabled, just return diff --git a/client_options.go b/client_options.go index 7a439a9f..615b2cd8 100644 --- a/client_options.go +++ b/client_options.go @@ -41,9 +41,6 @@ func defaultClientOptions() *clientOptions { datastoreLogger := logging.CreateGormLoggerAdapter(&dWarnLogger, "datastore") // Set the default options return &clientOptions{ - // Incoming Transaction Checker (lookup external tx via miner for validity) - itc: true, - // By default check input utxos (unless disabled by the user) iuc: true, @@ -239,13 +236,6 @@ func WithModels(models ...interface{}) ClientOps { } } -// WithITCDisabled will disable (ITC) incoming transaction checking -func WithITCDisabled() ClientOps { - return func(c *clientOptions) { - c.itc = false - } -} - // WithIUCDisabled will disable checking the input utxos func WithIUCDisabled() ClientOps { return func(c *clientOptions) { diff --git a/client_options_test.go b/client_options_test.go index d31e3133..fc359e7e 100644 --- a/client_options_test.go +++ b/client_options_test.go @@ -25,6 +25,7 @@ import ( // TestNewRelicOptions will test the method enable() func Test_newRelicOptions_enable(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("enable with valid app", func(t *testing.T) { app, err := tester.GetNewRelicApp(defaultNewRelicApp) @@ -33,6 +34,7 @@ func Test_newRelicOptions_enable(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithNewRelic(app)) + opts = append(opts, WithLogger(&testLogger)) var tc ClientInterface tc, err = NewClient( @@ -50,6 +52,7 @@ func Test_newRelicOptions_enable(t *testing.T) { t.Run("enable with invalid app", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithNewRelic(nil)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -64,15 +67,15 @@ func Test_newRelicOptions_enable(t *testing.T) { // Test_newRelicOptions_getOrStartTxn will test the method getOrStartTxn() func Test_newRelicOptions_getOrStartTxn(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("Get a valid ctx and txn", func(t *testing.T) { app, err := tester.GetNewRelicApp(defaultNewRelicApp) require.NoError(t, err) require.NotNil(t, app) - logger := zerolog.Nop() opts := DefaultClientOpts(false, true) - opts = append(opts, WithNewRelic(app), WithLogger(&logger)) + opts = append(opts, WithNewRelic(app), WithLogger(&testLogger)) var tc ClientInterface tc, err = NewClient( @@ -91,9 +94,8 @@ func Test_newRelicOptions_getOrStartTxn(t *testing.T) { }) t.Run("invalid ctx and txn", func(t *testing.T) { - logger := zerolog.Nop() opts := DefaultClientOpts(false, true) - opts = append(opts, WithNewRelic(nil), WithLogger(&logger)) + opts = append(opts, WithNewRelic(nil), WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -134,8 +136,6 @@ func TestClient_defaultModelOptions(t *testing.T) { require.NotNil(t, dco.taskManager) - assert.Equal(t, true, dco.itc) - assert.Nil(t, dco.logger) }) } @@ -143,6 +143,7 @@ func TestClient_defaultModelOptions(t *testing.T) { // TestWithUserAgent will test the method WithUserAgent() func TestWithUserAgent(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithUserAgent("") @@ -150,9 +151,8 @@ func TestWithUserAgent(t *testing.T) { }) t.Run("empty user agent", func(t *testing.T) { - logger := zerolog.Nop() opts := DefaultClientOpts(false, true) - opts = append(opts, WithUserAgent(""), WithLogger(&logger)) + opts = append(opts, WithUserAgent(""), WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -166,9 +166,8 @@ func TestWithUserAgent(t *testing.T) { t.Run("custom user agent", func(t *testing.T) { customAgent := "custom-user-agent" - logger := zerolog.Nop() opts := DefaultClientOpts(false, true) - opts = append(opts, WithUserAgent(customAgent), WithLogger(&logger)) + opts = append(opts, WithUserAgent(customAgent), WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -217,6 +216,7 @@ func TestWithDebugging(t *testing.T) { // TestWithEncryption will test the method WithEncryption() func TestWithEncryption(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithEncryption("") @@ -226,6 +226,7 @@ func TestWithEncryption(t *testing.T) { t.Run("empty key", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithEncryption("")) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -239,6 +240,7 @@ func TestWithEncryption(t *testing.T) { key, _ := utils.RandomHex(32) opts := DefaultClientOpts(false, true) opts = append(opts, WithEncryption(key)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -251,6 +253,8 @@ func TestWithEncryption(t *testing.T) { // TestWithRedis will test the method WithRedis() func TestWithRedis(t *testing.T) { + testLogger := zerolog.Nop() + t.Run("check type", func(t *testing.T) { opt := WithRedis(nil) assert.IsType(t, *new(ClientOps), opt) @@ -269,6 +273,7 @@ func TestWithRedis(t *testing.T) { }), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -292,6 +297,7 @@ func TestWithRedis(t *testing.T) { }), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -305,6 +311,8 @@ func TestWithRedis(t *testing.T) { // TestWithRedisConnection will test the method WithRedisConnection() func TestWithRedisConnection(t *testing.T) { + testLogger := zerolog.Nop() + t.Run("check type", func(t *testing.T) { opt := WithRedisConnection(nil) assert.IsType(t, *new(ClientOps), opt) @@ -317,6 +325,7 @@ func TestWithRedisConnection(t *testing.T) { WithRedisConnection(nil), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -338,6 +347,7 @@ func TestWithRedisConnection(t *testing.T) { WithRedisConnection(client), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -359,12 +369,14 @@ func TestWithFreeCache(t *testing.T) { }) t.Run("using FreeCache", func(t *testing.T) { + testLogger := zerolog.Nop() tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCache(), WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), - WithMinercraft(&chainstate.MinerCraftBase{})) + WithMinercraft(&chainstate.MinerCraftBase{}), + WithLogger(&testLogger)) require.NoError(t, err) require.NotNil(t, tc) defer CloseClient(context.Background(), t, tc) @@ -378,6 +390,7 @@ func TestWithFreeCache(t *testing.T) { // TestWithFreeCacheConnection will test the method WithFreeCacheConnection() func TestWithFreeCacheConnection(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithFreeCacheConnection(nil) @@ -385,15 +398,13 @@ func TestWithFreeCacheConnection(t *testing.T) { }) t.Run("using a nil client", func(t *testing.T) { - logger := zerolog.Nop() - tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCacheConnection(nil), WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithMinercraft(&chainstate.MinerCraftBase{}), - WithLogger(&logger), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -407,14 +418,13 @@ func TestWithFreeCacheConnection(t *testing.T) { t.Run("using an existing connection", func(t *testing.T) { fc := freecache.NewCache(cachestore.DefaultCacheSize) - logger := zerolog.Nop() tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCacheConnection(fc), WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithMinercraft(&chainstate.MinerCraftBase{}), - WithLogger(&logger), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -429,10 +439,12 @@ func TestWithFreeCacheConnection(t *testing.T) { // TestWithPaymailClient will test the method WithPaymailClient() func TestWithPaymailClient(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("using a nil driver, automatically makes paymail client", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithPaymailClient(nil)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -449,6 +461,7 @@ func TestWithPaymailClient(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithPaymailClient(p)) + opts = append(opts, WithLogger(&testLogger)) var tc ClientInterface tc, err = NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) @@ -464,13 +477,13 @@ func TestWithPaymailClient(t *testing.T) { // TestWithTaskQ will test the method WithTaskQ() func TestWithTaskQ(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() // todo: test cases where config is nil, or cannot load TaskQ t.Run("using taskq using memory", func(t *testing.T) { - logger := zerolog.Nop() tcOpts := DefaultClientOpts(true, true) - tcOpts = append(tcOpts, WithLogger(&logger)) + tcOpts = append(tcOpts, WithLogger(&testLogger)) tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), @@ -490,8 +503,6 @@ func TestWithTaskQ(t *testing.T) { t.Skip("skipping live local redis tests") } - logger := zerolog.Nop() - tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithTaskqConfig( @@ -502,7 +513,7 @@ func TestWithTaskQ(t *testing.T) { }), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), - WithLogger(&logger), + WithLogger(&testLogger), ) require.NoError(t, err) require.NotNil(t, tc) @@ -553,6 +564,7 @@ func TestWithLogger(t *testing.T) { // TestWithModels will test the method WithModels() func TestWithModels(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithModels() @@ -562,6 +574,7 @@ func TestWithModels(t *testing.T) { t.Run("empty models - returns default models", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithModels()) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -570,7 +583,7 @@ func TestWithModels(t *testing.T) { assert.Equal(t, []string{ ModelXPub.String(), ModelAccessKey.String(), - ModelDraftTransaction.String(), ModelIncomingTransaction.String(), + ModelDraftTransaction.String(), ModelTransaction.String(), ModelBlockHeader.String(), ModelSyncTransaction.String(), ModelDestination.String(), ModelUtxo.String(), @@ -580,6 +593,7 @@ func TestWithModels(t *testing.T) { t.Run("add custom models", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithModels(newPaymail(testPaymail))) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -588,7 +602,7 @@ func TestWithModels(t *testing.T) { assert.Equal(t, []string{ ModelXPub.String(), ModelAccessKey.String(), - ModelDraftTransaction.String(), ModelIncomingTransaction.String(), + ModelDraftTransaction.String(), ModelTransaction.String(), ModelBlockHeader.String(), ModelSyncTransaction.String(), ModelDestination.String(), ModelUtxo.String(), ModelPaymailAddress.String(), @@ -596,42 +610,10 @@ func TestWithModels(t *testing.T) { }) } -// TestWithITCDisabled will test the method WithITCDisabled() -func TestWithITCDisabled(t *testing.T) { - t.Parallel() - - t.Run("check type", func(t *testing.T) { - opt := WithITCDisabled() - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("default options", func(t *testing.T) { - opts := DefaultClientOpts(false, true) - - tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) - require.NoError(t, err) - require.NotNil(t, tc) - defer CloseClient(context.Background(), t, tc) - - assert.Equal(t, true, tc.IsITCEnabled()) - }) - - t.Run("itc disabled", func(t *testing.T) { - opts := DefaultClientOpts(false, true) - opts = append(opts, WithITCDisabled()) - - tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) - require.NoError(t, err) - require.NotNil(t, tc) - defer CloseClient(context.Background(), t, tc) - - assert.Equal(t, false, tc.IsITCEnabled()) - }) -} - // TestWithIUCDisabled will test the method WithIUCDisabled() func TestWithIUCDisabled(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithIUCDisabled() @@ -640,6 +622,7 @@ func TestWithIUCDisabled(t *testing.T) { t.Run("default options", func(t *testing.T) { opts := DefaultClientOpts(false, true) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -649,9 +632,10 @@ func TestWithIUCDisabled(t *testing.T) { assert.Equal(t, true, tc.IsIUCEnabled()) }) - t.Run("itc disabled", func(t *testing.T) { + t.Run("iuc disabled", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithIUCDisabled()) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -665,6 +649,7 @@ func TestWithIUCDisabled(t *testing.T) { // TestWithImportBlockHeaders will test the method WithImportBlockHeaders() func TestWithImportBlockHeaders(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithImportBlockHeaders("") @@ -674,6 +659,7 @@ func TestWithImportBlockHeaders(t *testing.T) { t.Run("empty url", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithImportBlockHeaders("")) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -688,6 +674,7 @@ func TestWithImportBlockHeaders(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithImportBlockHeaders(customURL)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -701,6 +688,7 @@ func TestWithImportBlockHeaders(t *testing.T) { // TestWithHTTPClient will test the method WithHTTPClient() func TestWithHTTPClient(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithHTTPClient(nil) @@ -710,6 +698,7 @@ func TestWithHTTPClient(t *testing.T) { t.Run("test applying nil", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithHTTPClient(nil)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -723,6 +712,7 @@ func TestWithHTTPClient(t *testing.T) { customClient := &http.Client{} opts := DefaultClientOpts(false, true) opts = append(opts, WithHTTPClient(customClient)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -736,6 +726,7 @@ func TestWithHTTPClient(t *testing.T) { // TestWithCustomCachestore will test the method WithCustomCachestore() func TestWithCustomCachestore(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithCustomCachestore(nil) @@ -745,6 +736,7 @@ func TestWithCustomCachestore(t *testing.T) { t.Run("test applying nil", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithCustomCachestore(nil)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -760,6 +752,7 @@ func TestWithCustomCachestore(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithCustomCachestore(customCache)) + opts = append(opts, WithLogger(&testLogger)) var tc ClientInterface tc, err = NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) @@ -774,6 +767,7 @@ func TestWithCustomCachestore(t *testing.T) { // TestWithCustomDatastore will test the method WithCustomDatastore() func TestWithCustomDatastore(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithCustomDatastore(nil) @@ -783,6 +777,7 @@ func TestWithCustomDatastore(t *testing.T) { t.Run("test applying nil", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithCustomDatastore(nil)) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -798,6 +793,7 @@ func TestWithCustomDatastore(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithCustomDatastore(customData)) + opts = append(opts, WithLogger(&testLogger)) var tc ClientInterface tc, err = NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) @@ -817,6 +813,7 @@ func TestWithCustomDatastore(t *testing.T) { // TestWithAutoMigrate will test the method WithAutoMigrate() func TestWithAutoMigrate(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithAutoMigrate() @@ -826,6 +823,7 @@ func TestWithAutoMigrate(t *testing.T) { t.Run("no additional models, just base models", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithAutoMigrate()) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -836,7 +834,6 @@ func TestWithAutoMigrate(t *testing.T) { ModelXPub.String(), ModelAccessKey.String(), ModelDraftTransaction.String(), - ModelIncomingTransaction.String(), ModelTransaction.String(), ModelBlockHeader.String(), ModelSyncTransaction.String(), @@ -848,6 +845,7 @@ func TestWithAutoMigrate(t *testing.T) { t.Run("one additional model", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithAutoMigrate(newPaymail(testPaymail))) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -858,7 +856,6 @@ func TestWithAutoMigrate(t *testing.T) { ModelXPub.String(), ModelAccessKey.String(), ModelDraftTransaction.String(), - ModelIncomingTransaction.String(), ModelTransaction.String(), ModelBlockHeader.String(), ModelSyncTransaction.String(), @@ -872,6 +869,7 @@ func TestWithAutoMigrate(t *testing.T) { // TestWithMigrationDisabled will test the method WithMigrationDisabled() func TestWithMigrationDisabled(t *testing.T) { t.Parallel() + testLogger := zerolog.Nop() t.Run("check type", func(t *testing.T) { opt := WithMigrationDisabled() @@ -880,6 +878,7 @@ func TestWithMigrationDisabled(t *testing.T) { t.Run("default options", func(t *testing.T) { opts := DefaultClientOpts(false, true) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) @@ -892,6 +891,7 @@ func TestWithMigrationDisabled(t *testing.T) { t.Run("migration disabled", func(t *testing.T) { opts := DefaultClientOpts(false, true) opts = append(opts, WithMigrationDisabled()) + opts = append(opts, WithLogger(&testLogger)) tc, err := NewClient(tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), opts...) require.NoError(t, err) diff --git a/cron_job_declarations.go b/cron_job_declarations.go index 08385435..1d039ecc 100644 --- a/cron_job_declarations.go +++ b/cron_job_declarations.go @@ -10,7 +10,6 @@ import ( // Cron job names to be used in WithCronCustomPeriod const ( CronJobNameDraftTransactionCleanUp = "draft_transaction_clean_up" - CronJobNameIncomingTransaction = "incoming_transaction_process" CronJobNameSyncTransactionBroadcast = "sync_transaction_broadcast" CronJobNameSyncTransactionSync = "sync_transaction_sync" ) @@ -31,10 +30,6 @@ func (c *Client) cronJobs() taskmanager.CronJobs { Period: 60 * time.Second, Handler: handler(taskCleanupDraftTransactions), }, - CronJobNameIncomingTransaction: { - Period: 2 * time.Minute, - Handler: handler(taskProcessIncomingTransactions), - }, CronJobNameSyncTransactionBroadcast: { Period: 2 * time.Minute, Handler: handler(taskBroadcastTransactions), diff --git a/cron_job_definitions.go b/cron_job_definitions.go index f900b102..6ccf3d95 100644 --- a/cron_job_definitions.go +++ b/cron_job_definitions.go @@ -53,17 +53,6 @@ func taskCleanupDraftTransactions(ctx context.Context, client *Client) error { return nil } -// taskProcessIncomingTransactions will process any incoming transactions found -func taskProcessIncomingTransactions(ctx context.Context, client *Client) error { - client.Logger().Info().Msg("running process incoming transaction(s) task...") - - err := processIncomingTransactions(ctx, client.Logger(), 10, WithClient(client)) - if err == nil || errors.Is(err, datastore.ErrNoResults) { - return nil - } - return err -} - // taskBroadcastTransactions will broadcast any transactions func taskBroadcastTransactions(ctx context.Context, client *Client) error { client.Logger().Info().Msg("running broadcast transaction(s) task...") diff --git a/definitions.go b/definitions.go index bcc12d75..4e572112 100644 --- a/definitions.go +++ b/definitions.go @@ -6,16 +6,14 @@ import ( // Defaults for engine functionality const ( - changeOutputSize = uint64(35) // Average size in bytes of a change output - databaseLongReadTimeout = 30 * time.Second // For all "GET" or "SELECT" methods - defaultBroadcastTimeout = 25 * time.Second // Default timeout for broadcasting - defaultCacheLockTTL = 20 // in Seconds - defaultCacheLockTTW = 10 // in Seconds - defaultDatabaseReadTimeout = 20 * time.Second // For all "GET" or "SELECT" methods - defaultDraftTxExpiresIn = 20 * time.Second // Default TTL for draft transactions - defaultHTTPTimeout = 20 * time.Second // Default timeout for HTTP requests - defaultMonitorSleep = 2 * time.Second - defaultMonitorLockTTL = 10 // in seconds - should be larger than defaultMonitorSleep + changeOutputSize = uint64(35) // Average size in bytes of a change output + databaseLongReadTimeout = 30 * time.Second // For all "GET" or "SELECT" methods + defaultBroadcastTimeout = 25 * time.Second // Default timeout for broadcasting + defaultCacheLockTTL = 20 // in Seconds + defaultCacheLockTTW = 10 // in Seconds + defaultDatabaseReadTimeout = 20 * time.Second // For all "GET" or "SELECT" methods + defaultDraftTxExpiresIn = 20 * time.Second // Default TTL for draft transactions + defaultHTTPTimeout = 20 * time.Second // Default timeout for HTTP requests defaultOverheadSize = uint64(8) // 8 bytes is the default overhead in a transaction = 4 bytes version + 4 bytes nLockTime defaultQueryTxTimeout = 10 * time.Second // Default timeout for syncing on-chain information defaultSleepForNewBlockHeaders = 30 * time.Second // Default wait before checking for a new unprocessed block @@ -28,18 +26,17 @@ const ( // All the base models const ( - ModelAccessKey ModelName = "access_key" - ModelBlockHeader ModelName = "block_header" - ModelDestination ModelName = "destination" - ModelDraftTransaction ModelName = "draft_transaction" - ModelIncomingTransaction ModelName = "incoming_transaction" - ModelMetadata ModelName = "metadata" - ModelNameEmpty ModelName = "empty" - ModelPaymailAddress ModelName = "paymail_address" - ModelSyncTransaction ModelName = "sync_transaction" - ModelTransaction ModelName = "transaction" - ModelUtxo ModelName = "utxo" - ModelXPub ModelName = "xpub" + ModelAccessKey ModelName = "access_key" + ModelBlockHeader ModelName = "block_header" + ModelDestination ModelName = "destination" + ModelDraftTransaction ModelName = "draft_transaction" + ModelMetadata ModelName = "metadata" + ModelNameEmpty ModelName = "empty" + ModelPaymailAddress ModelName = "paymail_address" + ModelSyncTransaction ModelName = "sync_transaction" + ModelTransaction ModelName = "transaction" + ModelUtxo ModelName = "utxo" + ModelXPub ModelName = "xpub" ) // AllModelNames is a list of all models @@ -47,7 +44,6 @@ var AllModelNames = []ModelName{ ModelAccessKey, ModelBlockHeader, ModelDestination, - ModelIncomingTransaction, ModelMetadata, ModelPaymailAddress, ModelPaymailAddress, @@ -59,16 +55,15 @@ var AllModelNames = []ModelName{ // Internal table names const ( - tableAccessKeys = "access_keys" - tableBlockHeaders = "block_headers" - tableDestinations = "destinations" - tableDraftTransactions = "draft_transactions" - tableIncomingTransactions = "incoming_transactions" - tablePaymailAddresses = "paymail_addresses" - tableSyncTransactions = "sync_transactions" - tableTransactions = "transactions" - tableUTXOs = "utxos" - tableXPubs = "xpubs" + tableAccessKeys = "access_keys" + tableBlockHeaders = "block_headers" + tableDestinations = "destinations" + tableDraftTransactions = "draft_transactions" + tablePaymailAddresses = "paymail_addresses" + tableSyncTransactions = "sync_transactions" + tableTransactions = "transactions" + tableUTXOs = "utxos" + tableXPubs = "xpubs" ) const ( @@ -153,11 +148,6 @@ var BaseModels = []interface{}{ Model: *NewBaseModel(ModelDraftTransaction), }, - // Incoming transactions (external & unknown) (related to Transaction & Draft) - &IncomingTransaction{ - Model: *NewBaseModel(ModelIncomingTransaction), - }, - // Finalized transactions (related to Draft) &Transaction{ Model: *NewBaseModel(ModelTransaction), diff --git a/examples/client/custom_cron/custom_cron.go b/examples/client/custom_cron/custom_cron.go index 535053bc..fed09089 100644 --- a/examples/client/custom_cron/custom_cron.go +++ b/examples/client/custom_cron/custom_cron.go @@ -11,8 +11,8 @@ import ( func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithCronCustomPeriod(bux.CronJobNameDraftTransactionCleanUp, 2*time.Second), - bux.WithCronCustomPeriod(bux.CronJobNameIncomingTransaction, 4*time.Second), + bux.WithCronCustmPeriod(bux.CronJobNameDraftTransactionCleanUp, 2*time.Second), + bux.WithCronCustmPeriod(bux.CronJobNameSyncTransactionSync, 4*time.Second), ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/interface.go b/interface.go index cd9f6bfa..19caeed5 100644 --- a/interface.go +++ b/interface.go @@ -82,9 +82,9 @@ type DestinationService interface { queryParams *datastore.QueryParams) ([]*Destination, error) GetDestinationsByXpubIDCount(ctx context.Context, xPubID string, usingMetadata *Metadata, conditions *map[string]interface{}) (int64, error) - NewDestination(ctx context.Context, xPubKey string, chain uint32, destinationType string, monitor bool, + NewDestination(ctx context.Context, xPubKey string, chain uint32, destinationType string, opts ...ModelOps) (*Destination, error) - NewDestinationForLockingScript(ctx context.Context, xPubID, lockingScript string, monitor bool, + NewDestinationForLockingScript(ctx context.Context, xPubID, lockingScript string, opts ...ModelOps) (*Destination, error) UpdateDestinationMetadataByID(ctx context.Context, xPubID, id string, metadata Metadata) (*Destination, error) UpdateDestinationMetadataByLockingScript(ctx context.Context, xPubID, @@ -194,7 +194,6 @@ type ClientInterface interface { ImportBlockHeadersFromURL() string IsDebug() bool IsEncryptionKeySet() bool - IsITCEnabled() bool IsIUCEnabled() bool IsMigrationEnabled() bool IsNewRelicEnabled() bool diff --git a/locks.go b/locks.go index 81c434b1..783d4d05 100644 --- a/locks.go +++ b/locks.go @@ -7,9 +7,7 @@ import ( ) const ( - lockKeyMonitorLockID = "monitor-lock-id-%s" // + Lock ID lockKeyProcessBroadcastTx = "process-broadcast-transaction-%s" // + Tx ID - lockKeyProcessIncomingTx = "process-incoming-transaction-%s" // + Tx ID lockKeyProcessP2PTx = "process-p2p-transaction-%s" // + Tx ID lockKeyProcessSyncTx = "process-sync-transaction-task" lockKeyProcessXpub = "action-xpub-id-%s" // + Xpub ID diff --git a/mock_chainstate_test.go b/mock_chainstate_test.go index 38d4b16c..3180cfc1 100644 --- a/mock_chainstate_test.go +++ b/mock_chainstate_test.go @@ -102,10 +102,6 @@ type chainStateEverythingOnChain struct { chainStateEverythingInMempool } -func (c *chainStateEverythingOnChain) Monitor() chainstate.MonitorService { - return nil -} - func (c *chainStateEverythingOnChain) BroadcastClient() broadcast.Client { return nil } diff --git a/model_destinations.go b/model_destinations.go index 22c7cc64..69e13b16 100644 --- a/model_destinations.go +++ b/model_destinations.go @@ -10,7 +10,6 @@ import ( "github.com/BuxOrg/bux/utils" "github.com/bitcoinschema/go-bitcoin/v2" "github.com/mrz1836/go-datastore" - customTypes "github.com/mrz1836/go-datastore/custom_types" ) // Destination is an object representing a BitCoin destination (address, script, etc) @@ -21,15 +20,14 @@ type Destination struct { Model `bson:",inline"` // Model specific fields - ID string `json:"id" toml:"id" yaml:"id" gorm:"<-:create;type:char(64);primaryKey;comment:This is the hash of the locking script" bson:"_id"` - XpubID string `json:"xpub_id" toml:"xpub_id" yaml:"xpub_id" gorm:"<-:create;type:char(64);index;comment:This is the related xPub" bson:"xpub_id"` - LockingScript string `json:"locking_script" toml:"locking_script" yaml:"locking_script" gorm:"<-:create;type:text;comment:This is Bitcoin output script in hex" bson:"locking_script"` - Type string `json:"type" toml:"type" yaml:"type" gorm:"<-:create;type:text;comment:Type of output" bson:"type"` - Chain uint32 `json:"chain" toml:"chain" yaml:"chain" gorm:"<-:create;type:int;comment:This is the (chain)/num location of the address related to the xPub" bson:"chain"` - Num uint32 `json:"num" toml:"num" yaml:"num" gorm:"<-:create;type:int;comment:This is the chain/(num) location of the address related to the xPub" bson:"num"` - Address string `json:"address" toml:"address" yaml:"address" gorm:"<-:create;type:varchar(35);index;comment:This is the BitCoin address" bson:"address"` - DraftID string `json:"draft_id" toml:"draft_id" yaml:"draft_id" gorm:"<-:create;type:varchar(64);index;comment:This is the related draft id (if internal tx)" bson:"draft_id,omitempty"` - Monitor customTypes.NullTime `json:"monitor" toml:"monitor" yaml:"monitor" gorm:";index;comment:When this address was last used for an external transaction, for monitoring" bson:"monitor,omitempty"` + ID string `json:"id" toml:"id" yaml:"id" gorm:"<-:create;type:char(64);primaryKey;comment:This is the hash of the locking script" bson:"_id"` + XpubID string `json:"xpub_id" toml:"xpub_id" yaml:"xpub_id" gorm:"<-:create;type:char(64);index;comment:This is the related xPub" bson:"xpub_id"` + LockingScript string `json:"locking_script" toml:"locking_script" yaml:"locking_script" gorm:"<-:create;type:text;comment:This is Bitcoin output script in hex" bson:"locking_script"` + Type string `json:"type" toml:"type" yaml:"type" gorm:"<-:create;type:text;comment:Type of output" bson:"type"` + Chain uint32 `json:"chain" toml:"chain" yaml:"chain" gorm:"<-:create;type:int;comment:This is the (chain)/num location of the address related to the xPub" bson:"chain"` + Num uint32 `json:"num" toml:"num" yaml:"num" gorm:"<-:create;type:int;comment:This is the chain/(num) location of the address related to the xPub" bson:"num"` + Address string `json:"address" toml:"address" yaml:"address" gorm:"<-:create;type:varchar(35);index;comment:This is the BitCoin address" bson:"address"` + DraftID string `json:"draft_id" toml:"draft_id" yaml:"draft_id" gorm:"<-:create;type:varchar(64);index;comment:This is the related draft id (if internal tx)" bson:"draft_id,omitempty"` } // newDestination will start a new Destination model for a locking script diff --git a/model_destinations_test.go b/model_destinations_test.go index baa1fe84..c9d18b2e 100644 --- a/model_destinations_test.go +++ b/model_destinations_test.go @@ -33,7 +33,6 @@ func TestDestination_newDestination(t *testing.T) { assert.Equal(t, ModelDestination.String(), destination.GetModelName()) assert.Equal(t, true, destination.IsNew()) assert.Equal(t, "", destination.LockingScript) - assert.Equal(t, false, destination.Monitor.Valid) assert.Equal(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", destination.GetID()) }) @@ -46,7 +45,6 @@ func TestDestination_newDestination(t *testing.T) { assert.Equal(t, ModelDestination.String(), destination.GetModelName()) assert.Equal(t, true, destination.IsNew()) assert.Equal(t, testScript, destination.LockingScript) - assert.Equal(t, false, destination.Monitor.Valid) assert.Equal(t, xPubID, destination.XpubID) assert.Equal(t, bscript2.ScriptTypeNonStandard, destination.Type) assert.Equal(t, testDestinationID, destination.GetID()) @@ -284,7 +282,7 @@ func TestClient_NewDestination(t *testing.T) { // Create a new destination destination, err := client.NewDestination( - ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, opts..., + ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.NoError(t, err) require.NotNil(t, destination) @@ -312,7 +310,7 @@ func TestClient_NewDestination(t *testing.T) { // Create a new destination destination, err := client.NewDestination( - ctx, "bad-value", utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + ctx, "bad-value", utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.Error(t, err) @@ -332,7 +330,7 @@ func TestClient_NewDestination(t *testing.T) { // Create a new destination destination, err := client.NewDestination( - ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + ctx, testXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, opts..., ) require.Error(t, err) @@ -357,7 +355,7 @@ func TestClient_NewDestination(t *testing.T) { // Create a new destination destination, err := client.NewDestination( - ctx, rawXPub, utils.ChainExternal, utils.ScriptTypeMultiSig, false, + ctx, rawXPub, utils.ChainExternal, utils.ScriptTypeMultiSig, opts..., ) require.Error(t, err) @@ -381,7 +379,7 @@ func TestClient_NewDestination(t *testing.T) { // Create a new destination destination, err := client.NewDestinationForLockingScript( - ctx, utils.Hash(rawXPub), stasHex, false, + ctx, utils.Hash(rawXPub), stasHex, opts..., ) require.NoError(t, err) @@ -412,7 +410,7 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { // Create model tc.MockSQLDB.ExpectExec("INSERT INTO `"+tc.tablePrefix+"_destinations` ("+ "`created_at`,`updated_at`,`metadata`,`deleted_at`,`id`,`xpub_id`,`locking_script`,"+ - "`type`,`chain`,`num`,`address`,`draft_id`,`monitor`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)").WithArgs( + "`type`,`chain`,`num`,`address`,`draft_id`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)").WithArgs( tester.AnyTime{}, // created_at tester.AnyTime{}, // updated_at nil, // metadata @@ -425,7 +423,6 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { 0, // num destination.Address, // address testDraftID, // draft_id - nil, // monitor ).WillReturnResult(sqlmock.NewResult(1, 1)) // Commit the TX @@ -459,7 +456,7 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { // Create model tc.MockSQLDB.ExpectExec("INSERT INTO `"+tc.tablePrefix+"_destinations` ("+ "`created_at`,`updated_at`,`metadata`,`deleted_at`,`id`,`xpub_id`,`locking_script`,"+ - "`type`,`chain`,`num`,`address`,`draft_id`,`monitor`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)").WithArgs( + "`type`,`chain`,`num`,`address`,`draft_id`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)").WithArgs( tester.AnyTime{}, // created_at tester.AnyTime{}, // updated_at nil, // metadata @@ -472,7 +469,6 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { 0, // num destination.Address, // address testDraftID, // draft_id - nil, // monitor ).WillReturnResult(sqlmock.NewResult(1, 1)) // Commit the TX @@ -504,7 +500,7 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { tc.MockSQLDB.ExpectBegin() // Create model - tc.MockSQLDB.ExpectExec(`INSERT INTO "`+tc.tablePrefix+`_destinations" ("created_at","updated_at","metadata","deleted_at","id","xpub_id","locking_script","type","chain","num","address","draft_id","monitor") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)`).WithArgs( + tc.MockSQLDB.ExpectExec(`INSERT INTO "`+tc.tablePrefix+`_destinations" ("created_at","updated_at","metadata","deleted_at","id","xpub_id","locking_script","type","chain","num","address","draft_id") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`).WithArgs( tester.AnyTime{}, // created_at tester.AnyTime{}, // updated_at nil, // metadata @@ -517,7 +513,6 @@ func (ts *EmbeddedDBTestSuite) TestDestination_Save() { 0, // num destination.Address, // address testDraftID, // draft_id - nil, // monitor ).WillReturnResult(sqlmock.NewResult(1, 1)) // Commit the TX diff --git a/model_incoming_transactions_test.go b/model_incoming_transactions_test.go deleted file mode 100644 index 779a35ac..00000000 --- a/model_incoming_transactions_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package bux - -import ( - "testing" - - "github.com/BuxOrg/bux/utils" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestIncomingTransaction_GetModelName will test the method GetModelName() -func TestIncomingTransaction_GetModelName(t *testing.T) { - t.Parallel() - - bTx, err := newIncomingTransaction(testTxHex, New()) - require.NoError(t, err) - - assert.Equal(t, ModelIncomingTransaction.String(), bTx.GetModelName()) -} - -// TestProcessIncomingTransaction will test the method processIncomingTransaction() -func (ts *EmbeddedDBTestSuite) TestProcessIncomingTransaction() { - - for _, testCase := range dbTestCases { - ts.T().Run(testCase.name+" - LIVE integration test - valid external incoming tx", func(t *testing.T) { - - // todo: mock the response vs using a LIVE request for Chainstate - - tc := ts.genericDBClient(t, testCase.database, true, WithCustomChainstate(&chainStateEverythingOnChain{})) - defer tc.Close(tc.ctx) - - // Create a xpub - var err error - xPubKey := "xpub6826nizKsKjNvxGbcYPiyS4tLVB3nd3e4yujBe6YmqmNtN3DMytsQMkruEgHoyUu89CHcTtaeeLynTC19fD4JcAvKXBUbHi9qdeWtUMYCQK" - xPub := newXpub(xPubKey, append(tc.client.DefaultModelOptions(), New())...) - require.NotNil(t, xPub) - - err = xPub.Save(tc.ctx) - require.NoError(t, err) - - // Create a destination - var destination *Destination - destination, err = xPub.getNewDestination(tc.ctx, utils.ChainExternal, utils.ScriptTypePubKeyHash, tc.client.DefaultModelOptions()...) - require.NoError(t, err) - require.NotNil(t, destination) - - // Save the updated xPub and new destination - err = xPub.Save(tc.ctx) - require.NoError(t, err) - - // Record an external incoming tx - txHex := "0100000001574eacf3305f561f63d6f1896566d5ff63409fea2aae1534a3e3734191b47430020000006b483045022100e3f002e318d2dfae67f00da8aa327cc905e93d4a5adb5b7c33afde95bfc26acc022000ddfcdba500e0ba9eaadde478e2b6c6566f8d6837e7802c5f867492eadfe5d1412102ff596abfae0099d480d93937380af985f5165b84ad31790c10c09d3daab8562effffffff01493a1100000000001976a914ec8470c5d9275c39829b15ea7f1997cb66082d3188ac00000000" - var tx *Transaction - tx, err = tc.client.RecordTransaction(tc.ctx, xPubKey, txHex, "", tc.client.DefaultModelOptions()...) - require.NoError(t, err) - require.NotNil(t, tx) - - // Process if found - err = processIncomingTransactions(tc.ctx, nil, 5, WithClient(tc.client)) - require.NoError(t, err) - - // Check if the tx is found in the datastore - var foundTx *Transaction - foundTx, err = tc.client.GetTransaction(tc.ctx, xPub.ID, tx.ID) - require.NoError(t, err) - require.NotNil(t, foundTx) - - // Test that we found the tx on-chain(600000 is a height of a mocked tx) - assert.Equal(t, uint64(600000), foundTx.BlockHeight) - }) - } -} diff --git a/model_transaction_config_test.go b/model_transaction_config_test.go index 684ec62f..3e7d2fdc 100644 --- a/model_transaction_config_test.go +++ b/model_transaction_config_test.go @@ -15,7 +15,7 @@ import ( ) var ( - emptyConfigJSON = "{\"change_destinations\":[{\"created_at\":\"0001-01-01T00:00:00Z\",\"updated_at\":\"0001-01-01T00:00:00Z\",\"deleted_at\":null,\"id\":\"c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646\",\"xpub_id\":\"1a0b10d4eda0636aae1709e7e7080485a4d99af3ca2962c6e677cf5b53d8ab8c\",\"locking_script\":\"76a9147ff514e6ae3deb46e6644caac5cdd0bf2388906588ac\",\"type\":\"pubkeyhash\",\"chain\":1,\"num\":123,\"address\":\"1CfaQw9udYNPccssFJFZ94DN8MqNZm9nGt\",\"draft_id\":\"test-reference\",\"monitor\":null}],\"change_destinations_strategy\":\"\",\"change_minimum_satoshis\":0,\"change_number_of_destinations\":0,\"change_satoshis\":124,\"expires_in\":20000000000,\"fee\":12,\"fee_unit\":{\"satoshis\":1,\"bytes\":20},\"from_utxos\":null,\"include_utxos\":null,\"inputs\":null,\"outputs\":null,\"sync\":null}" + emptyConfigJSON = "{\"change_destinations\":[{\"created_at\":\"0001-01-01T00:00:00Z\",\"updated_at\":\"0001-01-01T00:00:00Z\",\"deleted_at\":null,\"id\":\"c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646\",\"xpub_id\":\"1a0b10d4eda0636aae1709e7e7080485a4d99af3ca2962c6e677cf5b53d8ab8c\",\"locking_script\":\"76a9147ff514e6ae3deb46e6644caac5cdd0bf2388906588ac\",\"type\":\"pubkeyhash\",\"chain\":1,\"num\":123,\"address\":\"1CfaQw9udYNPccssFJFZ94DN8MqNZm9nGt\",\"draft_id\":\"test-reference\"}],\"change_destinations_strategy\":\"\",\"change_minimum_satoshis\":0,\"change_number_of_destinations\":0,\"change_satoshis\":124,\"expires_in\":20000000000,\"fee\":12,\"fee_unit\":{\"satoshis\":1,\"bytes\":20},\"from_utxos\":null,\"include_utxos\":null,\"inputs\":null,\"outputs\":null,\"sync\":null}" opReturn = "006a2231394878696756345179427633744870515663554551797131707a5a56646f417574324b65657020616e20657965206f6e207468697320706c61636520666f7220736f6d65204a616d696679206c6f76652e2e2e200d746578742f6d61726b646f776e055554462d38" unsetConfigJSON = "{\"change_destinations\":null,\"change_destinations_strategy\":\"\",\"change_minimum_satoshis\":0,\"change_number_of_destinations\":0,\"change_satoshis\":0,\"expires_in\":0,\"fee\":0,\"fee_unit\":null,\"from_utxos\":null,\"include_utxos\":null,\"inputs\":null,\"outputs\":null,\"sync\":null}" diff --git a/model_transactions.go b/model_transactions.go index cb919cc1..a39d2eb9 100644 --- a/model_transactions.go +++ b/model_transactions.go @@ -128,21 +128,6 @@ func newTransactionWithDraftID(txHex, draftID string, opts ...ModelOps) (*Transa return tx, nil } -// newTransactionFromIncomingTransaction will start a new transaction model using an incomingTx -func newTransactionFromIncomingTransaction(incomingTx *IncomingTransaction) (*Transaction, error) { - // Create the base - tx, err := baseTxFromHex(incomingTx.Hex, incomingTx.GetOptions(true)...) - if err != nil { - return nil, err - } - - tx.rawXpubKey = incomingTx.rawXpubKey - tx.setXPubID() - tx.Metadata = incomingTx.Metadata - - return tx, nil -} - // setXPubID will set the xPub ID on the model func (m *Transaction) setXPubID() { if len(m.rawXpubKey) > 0 && len(m.XPubID) == 0 { diff --git a/model_transactions_test.go b/model_transactions_test.go index d2bfcc94..71363588 100644 --- a/model_transactions_test.go +++ b/model_transactions_test.go @@ -853,7 +853,7 @@ func TestEndToEndTransaction(t *testing.T) { var err error destinations := make([]*Destination, 2) destinations[0], err = client.NewDestination( - ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, WithMetadatas(map[string]interface{}{ testMetadataKey: testMetadataValue, }), @@ -861,7 +861,7 @@ func TestEndToEndTransaction(t *testing.T) { require.NoError(t, err) destinations[1], err = client.NewDestination( - ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, false, + ctx, rawXPub, utils.ChainExternal, utils.ScriptTypePubKeyHash, WithMetadatas(map[string]interface{}{ testMetadataKey + "_2": testMetadataValue + "_2", }), @@ -880,12 +880,7 @@ func TestEndToEndTransaction(t *testing.T) { require.NoError(t, err) require.NotNil(t, transaction) require.Equal(t, "", transaction.DraftID) - require.Equal(t, SyncStatusProcessing, transaction.Status) - - // Get the transaction (now after processing) - transaction, err = client.GetTransaction(ctx, rawXPub, transaction.ID) - require.NoError(t, err) - require.NotNil(t, transaction) + require.Equal(t, SyncStatusComplete, transaction.Status) require.Equal(t, SyncStatusComplete, transaction.Status) assert.Equal(t, uint32(2), transaction.NumberOfOutputs) require.Equal(t, uint64(20000), transaction.TotalValue, transaction.TotalValue) diff --git a/models_test.go b/models_test.go index 1b7a4a61..694f2ef3 100644 --- a/models_test.go +++ b/models_test.go @@ -23,7 +23,6 @@ func TestModelName_String(t *testing.T) { assert.Equal(t, "block_header", ModelBlockHeader.String()) assert.Equal(t, "destination", ModelDestination.String()) assert.Equal(t, "empty", ModelNameEmpty.String()) - assert.Equal(t, "incoming_transaction", ModelIncomingTransaction.String()) assert.Equal(t, "metadata", ModelMetadata.String()) assert.Equal(t, "paymail_address", ModelPaymailAddress.String()) assert.Equal(t, "paymail_address", ModelPaymailAddress.String()) @@ -31,7 +30,7 @@ func TestModelName_String(t *testing.T) { assert.Equal(t, "transaction", ModelTransaction.String()) assert.Equal(t, "utxo", ModelUtxo.String()) assert.Equal(t, "xpub", ModelXPub.String()) - assert.Len(t, AllModelNames, 11) + assert.Len(t, AllModelNames, 10) }) } @@ -73,9 +72,6 @@ func TestModel_GetModelName(t *testing.T) { draftTx := DraftTransaction{} assert.Equal(t, ModelDraftTransaction.String(), *datastore.GetModelName(draftTx)) - incomingTx := IncomingTransaction{} - assert.Equal(t, ModelIncomingTransaction.String(), *datastore.GetModelName(incomingTx)) - paymailAddress := PaymailAddress{} assert.Equal(t, ModelPaymailAddress.String(), *datastore.GetModelName(paymailAddress)) @@ -111,9 +107,6 @@ func TestModel_GetModelTableName(t *testing.T) { draftTx := DraftTransaction{} assert.Equal(t, tableDraftTransactions, *datastore.GetModelTableName(draftTx)) - incomingTx := IncomingTransaction{} - assert.Equal(t, tableIncomingTransactions, *datastore.GetModelTableName(incomingTx)) - paymailAddress := PaymailAddress{} assert.Equal(t, tablePaymailAddresses, *datastore.GetModelTableName(paymailAddress)) diff --git a/monitor.go b/monitor.go deleted file mode 100644 index 23fd04ca..00000000 --- a/monitor.go +++ /dev/null @@ -1,103 +0,0 @@ -package bux - -import ( - "bufio" - "context" - "errors" - "fmt" - "os" - "time" - - "github.com/BuxOrg/bux/chainstate" - "github.com/BuxOrg/bux/cluster" - "github.com/BuxOrg/bux/utils" - "github.com/mrz1836/go-datastore" -) - -// destinationMonitor is the struct of responses for Monitoring -type destinationMonitor struct { - LockingScript string `json:"locking_script" toml:"locking_script" yaml:"locking_script" bson:"locking_script"` -} - -// loadMonitoredDestinations will load destinations that should be monitored -func loadMonitoredDestinations(ctx context.Context, client ClientInterface, monitor chainstate.MonitorService) error { - - // Create conditions using the max monitor days - conditions := map[string]interface{}{ - "monitor": map[string]interface{}{ - "$gt": time.Now().Add(time.Duration(-24*monitor.GetMonitorDays()) * time.Hour), - }, - } - - // Create monitor query with max destinations - queryParams := &datastore.QueryParams{ - Page: 1, - PageSize: monitor.GetMaxNumberOfDestinations(), - OrderByField: "monitor", - SortDirection: "desc", - } - - // Get all destinations that match the query - var destinations []*destinationMonitor - if err := client.Datastore().GetModels( - ctx, &[]*Destination{}, conditions, queryParams, &destinations, defaultDatabaseReadTimeout, - ); err != nil && !errors.Is(err, datastore.ErrNoResults) { - return err - } - - // Loop all destinations and add to Monitor - for _, model := range destinations { - if err := monitor.Processor().Add(utils.P2PKHRegexpString, model.LockingScript); err != nil { - return err - } - } - - // Debug line - if client.IsDebug() && client.Logger() != nil { - client.Logger().Info().Msgf("[MONITOR] Added %d destinations to monitor with hash %s", - len(destinations), monitor.Processor().GetHash(), - ) - } - - return nil -} - -// startDefaultMonitor will create a handler, start monitor, and store the first heartbeat -func startDefaultMonitor(ctx context.Context, client ClientInterface, monitor chainstate.MonitorService) error { - - if client.Chainstate().Monitor().LoadMonitoredDestinations() { - if err := loadMonitoredDestinations(ctx, client, monitor); err != nil { - return err - } - } - - _, err := client.Cluster().Subscribe(cluster.DestinationNew, func(data string) { - if monitor.IsDebug() { - monitor.Logger().Info().Msgf("[MONITOR] added %s destination to monitor: %s", utils.P2PKHRegexpString, data) - } - if err := monitor.Processor().Add(utils.P2PKHRegexpString, data); err != nil { - client.Logger().Error().Msg("could not add destination to monitor") - } - }) - if err != nil { - return err - } - - if monitor.IsDebug() { - // capture keyboard input and allow start and stop of the monitor - go func() { - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - text := scanner.Text() - fmt.Printf("KEYBOARD input: %s\n", text) - if text == "e" { - if err = monitor.Stop(ctx); err != nil { - fmt.Printf("ERROR: %s\n", err.Error()) - } - } - } - }() - } - - return nil -} diff --git a/monitor_event_handler.go b/monitor_event_handler.go deleted file mode 100644 index 7c377861..00000000 --- a/monitor_event_handler.go +++ /dev/null @@ -1,460 +0,0 @@ -package bux - -import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "runtime" - "strings" - "sync" - "time" - - "github.com/BuxOrg/bux/chainstate" - "github.com/centrifugal/centrifuge-go" - "github.com/korovkin/limiter" - "github.com/libsv/go-bc" - "github.com/libsv/go-bt/v2" - "github.com/rs/zerolog" -) - -// MonitorEventHandler for handling transaction events from a monitor -type MonitorEventHandler struct { - blockSyncChannel chan bool - buxClient ClientInterface - ctx context.Context - debug bool - limit *limiter.ConcurrencyLimiter - logger *zerolog.Logger - monitor chainstate.MonitorService -} - -type blockSubscriptionHandler struct { - buxClient ClientInterface - ctx context.Context - debug bool - errors []error - logger *zerolog.Logger - monitor chainstate.MonitorService - wg sync.WaitGroup - unsubscribed bool -} - -func (b *blockSubscriptionHandler) OnPublish(subscription *centrifuge.Subscription, e centrifuge.PublishEvent) { - channelName := subscription.Channel() - if strings.HasPrefix(channelName, "block:sync:") { - // block subscription - tx, err := b.monitor.Processor().FilterTransactionPublishEvent(e.Data) - if err != nil { - b.errors = append(b.errors, err) - b.logger.Error().Msgf("[MONITOR] processing block data: %s", err.Error()) - } - - if tx == "" { - return - } - - if _, err = b.buxClient.RecordRawTransaction(b.ctx, tx); err != nil { - // must not override err - btTx, btErr := bt.NewTxFromString(tx) - if btErr != nil { - b.logger.Error().Msgf("[MONITOR] could not parse transaction: %v", btErr) - return - } - - b.logger.Error().Msgf("[MONITOR] recording tx %s: %v", btTx.TxID(), err) - b.errors = append(b.errors, err) - return - } - - if b.debug { - b.logger.Info().Msgf("[MONITOR] successfully recorded tx: %v", tx) - } - } -} - -func (b *blockSubscriptionHandler) OnUnsubscribe(subscription *centrifuge.Subscription, _ centrifuge.UnsubscribeEvent) { - b.logger.Info().Msgf("[MONITOR] OnUnsubscribe: %s", subscription.Channel()) - - // close wait group - if !b.unsubscribed { - b.wg.Done() - b.unsubscribed = true - } -} - -// NewMonitorHandler create a new monitor handler -func NewMonitorHandler(ctx context.Context, buxClient ClientInterface, monitor chainstate.MonitorService) MonitorEventHandler { - return MonitorEventHandler{ - blockSyncChannel: make(chan bool), - buxClient: buxClient, - ctx: ctx, - debug: monitor.IsDebug(), - limit: limiter.NewConcurrencyLimiter(runtime.NumCPU()), - logger: monitor.Logger(), - monitor: monitor, - } -} - -// OnConnect event when connected -func (h *MonitorEventHandler) OnConnect(client *centrifuge.Client, e centrifuge.ConnectEvent) { - h.logger.Info().Msgf("[MONITOR] Connected to server: %s", e.ClientID) - - agentClient := &chainstate.AgentClient{ - Client: client, - } - filters := h.monitor.Processor().GetFilters() - for regex, bloomFilter := range filters { - if _, err := agentClient.SetFilter(regex, bloomFilter); err != nil { - h.logger.Error().Msgf("[MONITOR] processing mempool: %s", err.Error()) - } - } - - h.logger.Info().Msg("[MONITOR] PROCESS BLOCK HEADERS") - if err := h.ProcessBlockHeaders(h.ctx, client); err != nil { - h.logger.Error().Msgf("[MONITOR] processing block headers: %s", err.Error()) - } - - h.logger.Info().Msg("[MONITOR] PROCESS BLOCKS") - h.blockSyncChannel = make(chan bool) - go func() { - ctx := context.Background() - if err := h.ProcessBlocks(ctx, client, h.blockSyncChannel); err != nil { - h.logger.Error().Msgf("[MONITOR] processing blocks: %s", err.Error()) - } - }() - - h.monitor.Connected() -} - -// ProcessBlocks processes all transactions in blocks that have not yet been synced -func (h *MonitorEventHandler) ProcessBlocks(ctx context.Context, client *centrifuge.Client, blockChannel chan bool) error { - h.logger.Info().Msg("[MONITOR] ProcessBlocks start") - for { - // Check if channel has been closed - select { - case <-blockChannel: - h.logger.Info().Msg("[MONITOR] block sync channel closed, stopping ProcessBlocks") - return nil - default: - // get all block headers that have not been marked as synced - blockHeaders, err := h.buxClient.GetUnsyncedBlockHeaders(ctx) - if err != nil { - h.logger.Error().Msg(err.Error()) - } else { - h.logger.Info().Msgf("[MONITOR] processing block headers: %d", len(blockHeaders)) - for _, blockHeader := range blockHeaders { - h.logger.Info().Msgf("[MONITOR] Processing block %d: %s", blockHeader.Height, blockHeader.ID) - handler := &blockSubscriptionHandler{ - buxClient: h.buxClient, - ctx: ctx, - debug: h.debug, - logger: h.logger, - monitor: h.monitor, - } - - var subscription *centrifuge.Subscription - subscription, err = client.NewSubscription("block:sync:" + blockHeader.ID) - if err != nil { - h.logger.Error().Msg(err.Error()) - } else { - h.logger.Info().Msgf("[MONITOR] Starting block subscription: %v", subscription) - subscription.OnPublish(handler) - subscription.OnUnsubscribe(handler) - - handler.wg.Add(1) - if err = subscription.Subscribe(); err != nil { - h.logger.Error().Msg(err.Error()) - } else { - h.logger.Info().Msg("[MONITOR] Waiting for wait group to finish") - handler.wg.Wait() - - _ = subscription.Close() - - if len(handler.errors) <= 0 { - // save that block header has been synced - blockHeader.Synced.Valid = true - blockHeader.Synced.Time = time.Now() - if err = blockHeader.Save(ctx); err != nil { - h.logger.Error().Msg(err.Error()) - } - } - } - } - } - } - - time.Sleep(defaultSleepForNewBlockHeaders) - } - } -} - -// ProcessBlockHeaders processes all missing block headers -func (h *MonitorEventHandler) ProcessBlockHeaders(ctx context.Context, client *centrifuge.Client) error { - lastBlockHeader, err := h.buxClient.GetLastBlockHeader(ctx) - if err != nil { - h.logger.Error().Msg(err.Error()) - return err - } - if lastBlockHeader == nil { - h.logger.Info().Msg("no last block header found, skipping...") - return nil - } - var subscription *centrifuge.Subscription - subscription, err = client.NewSubscription("block:headers:history:" + fmt.Sprint(lastBlockHeader.Height)) - if err != nil { - h.logger.Error().Msg(err.Error()) - } else { - h.logger.Info().Msgf("[MONITOR] Starting block header subscription: %v", subscription) - subscription.OnPublish(h) - if err = subscription.Subscribe(); err != nil { - h.logger.Error().Msg(err.Error()) - } - } - - return nil -} - -// OnError on error event -func (h *MonitorEventHandler) OnError(_ *centrifuge.Client, e centrifuge.ErrorEvent) { - h.logger.Error().Msgf("[MONITOR] Error: %s", e.Message) -} - -// OnMessage on new message event -func (h *MonitorEventHandler) OnMessage(_ *centrifuge.Client, e centrifuge.MessageEvent) { - var data map[string]interface{} - err := json.Unmarshal(e.Data, &data) - if err != nil { - h.logger.Error().Msgf("[MONITOR] failed unmarshalling data: %s", err.Error()) - } - - if _, ok := data["time"]; !ok { - h.logger.Error().Msgf("[MONITOR] OnMessage: %v", data) - } -} - -// OnDisconnect when disconnected -func (h *MonitorEventHandler) OnDisconnect(_ *centrifuge.Client, _ centrifuge.DisconnectEvent) { - defer close(h.blockSyncChannel) - - defer func(logger *zerolog.Logger) { - rec := recover() - if rec != nil { - logger.Error().Msgf("[MONITOR] Tried closing a closed channel: %v", rec) - } - }(h.logger) - - h.monitor.Disconnected() -} - -// OnJoin event when joining a server -func (h *MonitorEventHandler) OnJoin(_ *centrifuge.Subscription, e centrifuge.JoinEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnJoin: %v", e) - } -} - -// OnLeave event when leaving a server -func (h *MonitorEventHandler) OnLeave(_ *centrifuge.Subscription, e centrifuge.LeaveEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnLeave: %v", e) - } -} - -// OnPublish on publish event -func (h *MonitorEventHandler) OnPublish(subscription *centrifuge.Subscription, e centrifuge.PublishEvent) { - channelName := subscription.Channel() - - if strings.HasPrefix(channelName, "block:headers:history:") { - bi := chainstate.BlockInfo{} - err := json.Unmarshal(e.Data, &bi) - if err != nil { - h.logger.Error().Msgf("[MONITOR] unmarshalling block header: %v", err) - return - } - - var existingBlock *BlockHeader - if existingBlock, err = h.buxClient.GetBlockHeaderByHeight(h.ctx, uint32(bi.Height)); err != nil { - h.logger.Error().Msgf("[MONITOR] getting block header by height: %v", err) - } - - if existingBlock == nil { - merkleRoot, _ := hex.DecodeString(bi.MerkleRoot) - previousBlockHash, _ := hex.DecodeString(bi.PreviousBlockHash) - - bh := bc.BlockHeader{ - Bits: []byte(bi.Bits), - HashMerkleRoot: merkleRoot, - HashPrevBlock: previousBlockHash, - Nonce: uint32(bi.Nonce), - Time: uint32(bi.Time), - Version: uint32(bi.Version), - } - - if _, err = h.buxClient.RecordBlockHeader( - h.ctx, bi.Hash, uint32(bi.Height), bh, - ); err != nil { - h.logger.Error().Msgf("[MONITOR] recording block header: %v", err) - return - } - } - } else { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnPublish: %v", e.Data) - } - } -} - -// OnServerSubscribe on server subscribe event -func (h *MonitorEventHandler) OnServerSubscribe(_ *centrifuge.Client, e centrifuge.ServerSubscribeEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnServerSubscribe: %v", e) - } -} - -// OnServerUnsubscribe on the unsubscribe event -func (h *MonitorEventHandler) OnServerUnsubscribe(_ *centrifuge.Client, e centrifuge.ServerUnsubscribeEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnServerUnsubscribe: %v", e) - } -} - -// OnSubscribeSuccess on subscribe success -func (h *MonitorEventHandler) OnSubscribeSuccess(_ *centrifuge.Subscription, e centrifuge.SubscribeSuccessEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnSubscribeSuccess: %v", e) - } -} - -// OnSubscribeError is for an error -func (h *MonitorEventHandler) OnSubscribeError(_ *centrifuge.Subscription, e centrifuge.SubscribeErrorEvent) { - h.logger.Error().Msgf("[MONITOR] OnSubscribeError: %v", e) -} - -// OnUnsubscribe will unsubscribe -func (h *MonitorEventHandler) OnUnsubscribe(_ *centrifuge.Subscription, e centrifuge.UnsubscribeEvent) { - if h.debug { - h.logger.Info().Msgf("[MONITOR] OnUnsubscribe: %v", e) - } -} - -// OnServerJoin event when joining a server -func (h *MonitorEventHandler) OnServerJoin(_ *centrifuge.Client, e centrifuge.ServerJoinEvent) { - h.logger.Info().Msgf("[MONITOR] Joined server: %v", e) -} - -// OnServerLeave event when leaving a server -func (h *MonitorEventHandler) OnServerLeave(_ *centrifuge.Client, e centrifuge.ServerLeaveEvent) { - h.logger.Info().Msgf("[MONITOR] Left server: %v", e) -} - -// OnServerPublish on server publish event -func (h *MonitorEventHandler) OnServerPublish(c *centrifuge.Client, e centrifuge.ServerPublishEvent) { - h.logger.Info().Msgf("[MONITOR] Server publish to channel %s with data %v", e.Channel, string(e.Data)) - // todo make this configurable - // h.onServerPublishLinear(c, e) - h.onServerPublishParallel(c, e) -} - -func (h *MonitorEventHandler) processMempoolPublish(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) { - tx, err := h.monitor.Processor().FilterTransactionPublishEvent(e.Data) - if err != nil { - h.logger.Error().Msgf("[MONITOR] failed to process server event: %v", err) - return - } - - if h.monitor.SaveDestinations() { - // Process transaction and save outputs - // todo: replace printf - fmt.Printf("Should save the destination here...\n") - } - - if tx == "" { - return - } - if _, err = h.buxClient.RecordRawTransaction(h.ctx, tx); err != nil { - h.logger.Error().Msgf("[MONITOR] recording tx: %v", err) - return - } - - if h.debug { - h.logger.Info().Msgf("[MONITOR] successfully recorded tx: %v", tx) - } -} - -func (h *MonitorEventHandler) processBlockHeaderPublish(client *centrifuge.Client, e centrifuge.ServerPublishEvent) { - bi := chainstate.BlockInfo{} - err := json.Unmarshal(e.Data, &bi) - if err != nil { - h.logger.Error().Msgf("[MONITOR] unmarshalling block header: %v", err) - return - } - merkleRoot, _ := hex.DecodeString(bi.MerkleRoot) - previousBlockHash, _ := hex.DecodeString(bi.PreviousBlockHash) - bh := bc.BlockHeader{ - HashPrevBlock: previousBlockHash, - HashMerkleRoot: merkleRoot, - Nonce: uint32(bi.Nonce), - Version: uint32(bi.Version), - Time: uint32(bi.Time), - Bits: []byte(bi.Bits), - } - - height := uint32(bi.Height) - var previousBlockHeader *BlockHeader - previousBlockHeader, err = getBlockHeaderByHeight(h.ctx, height-1, h.buxClient.DefaultModelOptions()...) - if err != nil { - h.logger.Error().Msgf("[MONITOR] retreiving previous block header: %v", err) - return - } - if previousBlockHeader == nil { - h.logger.Error().Msgf("[MONITOR] ERROR Previous block header not found: %d", height-1) - if err = h.ProcessBlockHeaders(h.ctx, client); err != nil { - h.logger.Error().Msgf("[MONITOR] processing block headers: %s", err.Error()) - } - return - } - - if _, err = h.buxClient.RecordBlockHeader(h.ctx, bi.Hash, height, bh); err != nil { - h.logger.Error().Msgf("[MONITOR] recording block header: %v", err) - return - } - - if h.debug { - h.logger.Info().Msgf("[MONITOR] successfully recorded blockheader: %v", bi.Hash) - } -} - -func (h *MonitorEventHandler) onServerPublishLinear(c *centrifuge.Client, e centrifuge.ServerPublishEvent) { - switch e.Channel { - case "mempool:transactions": - h.processMempoolPublish(c, e) - case "block:headers": - h.processBlockHeaderPublish(c, e) - } -} - -func (h *MonitorEventHandler) onServerPublishParallel(c *centrifuge.Client, e centrifuge.ServerPublishEvent) { - _, err := h.limit.Execute(func() { - h.onServerPublishLinear(c, e) - }) - if err != nil { - h.logger.Error().Msgf("[MONITOR] failed to start goroutine: %v", err) - } -} - -// SetMonitor sets the monitor for the given handler -func (h *MonitorEventHandler) SetMonitor(monitor *chainstate.Monitor) { - h.monitor = monitor -} - -// RecordTransaction records a transaction into bux -func (h *MonitorEventHandler) RecordTransaction(ctx context.Context, txHex string) error { - _, err := h.buxClient.RecordRawTransaction(ctx, txHex) - return err -} - -// RecordBlockHeader records a block header into bux -func (h *MonitorEventHandler) RecordBlockHeader(_ context.Context, _ bc.BlockHeader) error { - return nil -} diff --git a/paymail_service_provider.go b/paymail_service_provider.go index ba562676..4922465a 100644 --- a/paymail_service_provider.go +++ b/paymail_service_provider.go @@ -2,12 +2,8 @@ package bux import ( "context" - "database/sql" "encoding/hex" "fmt" - "reflect" - "time" - "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/utils" "github.com/bitcoin-sv/go-paymail" @@ -16,7 +12,7 @@ import ( "github.com/bitcoin-sv/go-paymail/spv" "github.com/bitcoinschema/go-bitcoin/v2" "github.com/libsv/go-bk/bec" - customTypes "github.com/mrz1836/go-datastore/custom_types" + "reflect" ) // PaymailDefaultServiceProvider is an interface for overriding the paymail actions in go-paymail/server @@ -89,7 +85,7 @@ func (p *PaymailDefaultServiceProvider) CreateAddressResolutionResponse( return nil, err } destination, err := createDestination( - ctx, paymailAddress, pubKey, true, append(p.client.DefaultModelOptions(), WithMetadatas(metadata))..., + ctx, paymailAddress, pubKey, append(p.client.DefaultModelOptions(), WithMetadatas(metadata))..., ) if err != nil { return nil, err @@ -127,7 +123,7 @@ func (p *PaymailDefaultServiceProvider) CreateP2PDestinationResponse( return nil, err } destination, err = createDestination( - ctx, paymailAddress, pubKey, false, append(p.client.DefaultModelOptions(), WithMetadatas(metadata))..., + ctx, paymailAddress, pubKey, append(p.client.DefaultModelOptions(), WithMetadatas(metadata))..., ) if err != nil { return nil, err @@ -247,7 +243,7 @@ func getXpubForPaymail(ctx context.Context, client ClientInterface, paymailAddre ) } -func createDestination(ctx context.Context, paymailAddress *PaymailAddress, pubKey *derivedPubKey, monitor bool, opts ...ModelOps) (destination *Destination, err error) { +func createDestination(ctx context.Context, paymailAddress *PaymailAddress, pubKey *derivedPubKey, opts ...ModelOps) (destination *Destination, err error) { lockingScript, err := createLockingScript(pubKey.ecPubKey) if err != nil { return nil, err @@ -259,14 +255,6 @@ func createDestination(ctx context.Context, paymailAddress *PaymailAddress, pubK destination.Chain = utils.ChainExternal destination.Num = pubKey.chainNum - // Only on for basic address resolution, not enabled for p2p - if monitor { - destination.Monitor = customTypes.NullTime{NullTime: sql.NullTime{ - Valid: true, - Time: time.Now(), - }} - } - if err = destination.Save(ctx); err != nil { return nil, err } diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go index 5641df23..5fa8bccd 100644 --- a/record_tx_strategy_external_incoming_tx.go +++ b/record_tx_strategy_external_incoming_tx.go @@ -16,12 +16,6 @@ type externalIncomingTx struct { func (strategy *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { logger := c.Logger() - - // process - if !strategy.broadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it - return _addTxToCheck(ctx, strategy, c, opts) - } - transaction, err := _createExternalTxToRecord(ctx, strategy, c, opts) if err != nil { return nil, fmt.Errorf("creation of external incoming tx failed. Reason: %w", err) @@ -83,31 +77,6 @@ func (strategy *externalIncomingTx) FailOnBroadcastError(forceFail bool) { strategy.allowBroadcastErrors = !forceFail } -func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { - logger := c.Logger() - - incomingTx, err := newIncomingTransaction(tx.Hex, c.DefaultModelOptions(append(opts, New())...)...) - if err != nil { - return nil, fmt.Errorf("tx creation failed. Reason: %w", err) - } - - logger.Info(). - Str("txID", incomingTx.ID). - Msg("start ITC") - - if err = incomingTx.Save(ctx); err != nil { - return nil, fmt.Errorf("adding new IncomingTx to check queue failed. Reason: %w", err) - } - - result := incomingTx.toTransactionDto() - result.Status = statusProcessing - - logger.Info(). - Str("txID", incomingTx.ID). - Msg("complete ITC") - return result, nil -} - func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { // Create NEW tx model tx, err := txFromHex(eTx.Hex, c.DefaultModelOptions(append(opts, New())...)...)