Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
galt-tr committed Apr 15, 2022
1 parent b9beb76 commit 959b40b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 142 deletions.
1 change: 1 addition & 0 deletions chainstate/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type MonitorService interface {
Disconnected()
IsConnected() bool
GetMonitorDays() int
SaveDestinations() bool
GetFalsePositiveRate() float64
GetMaxNumberOfDestinations() int
GetProcessMempoolOnConnect() bool
Expand Down
7 changes: 7 additions & 0 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Monitor struct {
connected bool
centrifugeServer string
monitorDays int
saveDestinations bool
falsePositiveRate float64
maxNumberOfDestinations int
processMempoolOnConnect bool
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewMonitor(_ context.Context, options *MonitorOptions) *Monitor {
maxNumberOfDestinations: options.MaxNumberOfDestinations,
falsePositiveRate: options.FalsePositiveRate,
monitorDays: options.MonitorDays,
saveDestinations: options.SaveDestinations,
processMempoolOnConnect: options.ProcessMempoolOnConnect,
}
monitor.processor = NewBloomProcessor(uint(monitor.maxNumberOfDestinations), monitor.falsePositiveRate)
Expand Down Expand Up @@ -108,6 +110,11 @@ func (m *Monitor) GetProcessMempoolOnConnect() bool {
return m.processMempoolOnConnect
}

// SaveDestinations gets whether or not we should save destinations from transactions that pass monitor filter
func (m *Monitor) SaveDestinations() bool {
return m.saveDestinations
}

// SetChainstateOptions sets the chainstate options on the monitor to allow more synching capabilities
func (m *Monitor) SetChainstateOptions(options *clientOptions) {
m.chainstateOptions = options
Expand Down
2 changes: 1 addition & 1 deletion client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Client) loadMonitor(ctx context.Context) (err error) {
// Load monitor if set by the user
monitor := c.options.chainstate.Monitor()
if monitor != nil {
handler := NewMonitorHandler(ctx, "", c, monitor)
handler := NewTransactionMonitorHandler(ctx, "", c, monitor)
err = c.loadMonitoredDestinations(ctx, monitor)
if err != nil {
return
Expand Down
141 changes: 0 additions & 141 deletions monitor_event_handler.go

This file was deleted.

145 changes: 145 additions & 0 deletions transaction_monitor_event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package bux

import (
"context"
"fmt"
"runtime"

"github.com/mrz1836/go-whatsonchain"

"github.com/BuxOrg/bux/chainstate"
"github.com/centrifugal/centrifuge-go"
"github.com/korovkin/limiter"
)

type transactionEventHandler struct {
monitor chainstate.MonitorService
buxClient ClientInterface
xpub string
ctx context.Context
limit *limiter.ConcurrencyLimiter
}

// NewTransactionMonitorHandler create a new monitor handler
func NewTransactionMonitorHandler(ctx context.Context, xpubKey string, buxClient ClientInterface, monitor chainstate.MonitorService) transactionEventHandler {
return transactionEventHandler{
monitor: monitor,
buxClient: buxClient,
xpub: xpubKey,
ctx: ctx,
limit: limiter.NewConcurrencyLimiter(runtime.NumCPU()),
}
}

func (h *transactionEventHandler) OnConnect(_ *centrifuge.Client, e centrifuge.ConnectEvent) {
fmt.Printf("Conntected to server: %s\n", e.ClientID)
if h.monitor.GetProcessMempoolOnConnect() {
fmt.Printf("PROCESS MEMPOOL\n")
go func() {
err := h.monitor.ProcessMempool(context.Background())
if err != nil {
fmt.Printf("ERROR processing mempool: %s\n", err.Error())
}
}()
}
h.monitor.Connected()
}

func (h *transactionEventHandler) OnError(_ *centrifuge.Client, e centrifuge.ErrorEvent) {
fmt.Printf("Error: %s", e.Message)
}

func (h *transactionEventHandler) OnMessage(_ *centrifuge.Client, e centrifuge.MessageEvent) {
}

func (h *transactionEventHandler) OnDisconnect(_ *centrifuge.Client, e centrifuge.DisconnectEvent) {
h.monitor.Disconnected()
}

func (h *transactionEventHandler) OnJoin(_ *centrifuge.Subscription, e centrifuge.JoinEvent) {
}

func (h *transactionEventHandler) OnLeave(_ *centrifuge.Subscription, e centrifuge.LeaveEvent) {
}

func (h *transactionEventHandler) OnPublish(_ *centrifuge.Subscription, e centrifuge.PublishEvent) {
}

func (h *transactionEventHandler) OnServerSubscribe(_ *centrifuge.Client, e centrifuge.ServerSubscribeEvent) {
}

func (h *transactionEventHandler) OnServerUnsubscribe(_ *centrifuge.Client, e centrifuge.ServerUnsubscribeEvent) {
}

func (h *transactionEventHandler) OnSubscribeSuccess(_ *centrifuge.Subscription, e centrifuge.SubscribeSuccessEvent) {
}

func (h *transactionEventHandler) OnSubscribeError(_ *centrifuge.Subscription, e centrifuge.SubscribeErrorEvent) {
}

func (h *transactionEventHandler) OnUnsubscribe(_ *centrifuge.Subscription, e centrifuge.UnsubscribeEvent) {
}

func (h *transactionEventHandler) OnServerJoin(_ *centrifuge.Client, e centrifuge.ServerJoinEvent) {
fmt.Printf("Joined server: %v\n", e)
}

func (h *transactionEventHandler) OnServerLeave(_ *centrifuge.Client, e centrifuge.ServerLeaveEvent) {
}

func (h *transactionEventHandler) OnServerPublish(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
// todo make this configurable
//h.OnServerPublishLinear(nil, e)
h.OnServerPublishParallel(nil, e)
}

func (h *transactionEventHandler) OnServerPublishLinear(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
tx, err := h.monitor.Processor().FilterMempoolPublishEvent(e)
if err != nil {
fmt.Printf("failed to process server event: %v\n", err)
return
}

if h.monitor.SaveDestinations() {
// Process transaction and save outputs
}

if tx == "" {
return
}
_, err = h.buxClient.RecordTransaction(h.ctx, h.xpub, tx, "")
if err != nil {
fmt.Printf("error recording tx: %v\n", err)
return
}
fmt.Printf("successfully recorded tx: %v\n", tx)
}

func (h *transactionEventHandler) OnServerPublishParallel(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
_, err := h.limit.Execute(func() {
h.OnServerPublishLinear(nil, e)
})

if err != nil {
fmt.Printf("failed to start goroutine: %v", err)
}
}

// SetMonitor sets the monitor for the given handler
func (h *transactionEventHandler) SetMonitor(monitor *chainstate.Monitor) {
h.monitor = monitor
}

// RecordTransaction records a transaction into bux
func (h *transactionEventHandler) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string) error {

_, err := h.buxClient.RecordTransaction(ctx, xPubKey, txHex, draftID)

return err
}

// GetWhatsOnChain returns the whats on chain client interface
func (h *transactionEventHandler) GetWhatsOnChain() whatsonchain.ClientInterface {

return h.buxClient.Chainstate().WhatsOnChain()
}

0 comments on commit 959b40b

Please sign in to comment.