Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Added block headers import
Browse files Browse the repository at this point in the history
  • Loading branch information
icellan committed Apr 18, 2022
1 parent 05ff74e commit 6ff250c
Show file tree
Hide file tree
Showing 23 changed files with 454 additions and 52 deletions.
2 changes: 1 addition & 1 deletion action_blockheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/libsv/go-bc"
)

// RecordBlockheader will Save a block header into the Datastore
// RecordBlockHeader will Save a block header into the Datastore
//
// hash is the hash of the block header
// bh is the block header data
Expand Down
3 changes: 2 additions & 1 deletion chainstate/filters/metanet_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package filters

import (
"github.com/mrz1836/go-whatsonchain"
"testing"

"github.com/mrz1836/go-whatsonchain"
)

func TestMetanet(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion chainstate/filters/planaria-b_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package filters

import (
"github.com/mrz1836/go-whatsonchain"
"testing"

"github.com/mrz1836/go-whatsonchain"
)

func TestPlanariaB(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions chainstate/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Monitor struct {
maxNumberOfDestinations int
processMempoolOnConnect bool
filterType string
regexList []string
handler MonitorHandler
}

Expand Down Expand Up @@ -128,7 +127,7 @@ func (m *Monitor) SaveDestinations() bool {
return m.saveTransactionsDestinations
}

// LooadMonitoredDestinations gets where we want to add the monitored destiantions from the database into the processor
// LoadMonitoredDestinations gets where we want to add the monitored destinations from the database into the processor
func (m *Monitor) LoadMonitoredDestinations() bool {
return m.loadMOnitoredDestinations
}
Expand All @@ -144,7 +143,7 @@ func (m *Monitor) Monitor(handler MonitorHandler) error {
if m.client == nil {
handler.SetMonitor(m)
m.handler = handler
m.logger.Info(context.Background(), "[MONITOR] Connecting to server: %s", m.centrifugeServer)
m.logger.Info(context.Background(), fmt.Sprintf("[MONITOR] Connecting to server: %s", m.centrifugeServer))
m.client = newCentrifugeClient(m.centrifugeServer, handler)
if m.token != "" {
m.client.SetToken(m.token)
Expand Down
5 changes: 3 additions & 2 deletions chainstate/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@ func (a *AgentClient) Disconnect() error {
return a.Client.Disconnect()
}

// SetToken set the client token
func (a *AgentClient) SetToken(token string) {
a.Client.SetToken(token)
}

// TODO: Just rely on the agent for this data type
// AddFilterMessage defines a new filter to be published from the client
// todo Just rely on the agent for this data type
type AddFilterMessage struct {
Timestamp int64 `json:"timestamp"`
Regex string `json:"regex"`
Filter string `json:"filter"`
Hash string `json:"hash"`
}

// AddFilter adds a new filtero the agent
// AddFilter adds a new filter to the agent
func (a *AgentClient) AddFilter(regex, item string) (centrifuge.PublishResult, error) {
msg := AddFilterMessage{
Regex: regex,
Expand Down
1 change: 0 additions & 1 deletion chainstate/monitor_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/libsv/go-bc"

"github.com/mrz1836/go-whatsonchain"
)

Expand Down
3 changes: 1 addition & 2 deletions chainstate/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/BuxOrg/bux/utils"
"github.com/bitcoinschema/go-bitcoin/v2"
"github.com/centrifugal/centrifuge-go"
"github.com/libsv/go-bt/v2/bscript"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
boom "github.com/tylertreat/BoomFilters"
)

Expand Down
10 changes: 10 additions & 0 deletions chainstate/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package chainstate

// TransactionType tx types
type TransactionType string

// Metanet type
const Metanet TransactionType = "metanet"

// PubKeyHash type
const PubKeyHash TransactionType = "pubkeyhash"

// PlanariaB type
const PlanariaB TransactionType = "planaria-b"

// PlanariaD type
const PlanariaD TransactionType = "planaria-d"

// RareCandyFrogCartel type
const RareCandyFrogCartel TransactionType = "rarecandy-frogcartel"
5 changes: 2 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"time"

"github.com/tonicpow/go-paymail"

"github.com/BuxOrg/bux/cachestore"
"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/datastore"
Expand All @@ -14,6 +12,7 @@ import (
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/tonicpow/go-paymail"
"github.com/tonicpow/go-paymail/server"
)

Expand Down Expand Up @@ -172,7 +171,7 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error)

// Load the blockchain monitor
if client.options.chainstate.Monitor() != nil {
if err := client.loadMonitor(ctx); err != nil {
if err = client.loadMonitor(ctx); err != nil {
return nil, err
}
}
Expand Down
1 change: 1 addition & 0 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *Client) loadMonitor(ctx context.Context) (err error) {
func (c *Client) runModelMigrations(models ...interface{}) (err error) {
d := c.Datastore()
for _, model := range models {
model.(ModelInterface).SetOptions(WithClient(c))
if err = model.(ModelInterface).Migrate(d); err != nil {
return
}
Expand Down
7 changes: 7 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,13 @@ func WithIUCDisabled() ClientOps {
}
}

// WithImportBlockHeaders will import block headers on startup
func WithImportBlockHeaders(blockHeadersZipFileURL string) ClientOps {
return func(c *clientOptions) {
c.dataStore.options = append(c.dataStore.options, datastore.WithImportBlockHeaders(blockHeadersZipFileURL))
}
}

// WithLogger will set the custom logger interface
func WithLogger(customLogger logger.Interface) ClientOps {
return func(c *clientOptions) {
Expand Down
29 changes: 15 additions & 14 deletions datastore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ type (

// clientOptions holds all the configuration for the client
clientOptions struct {
autoMigrate bool // Setting for Auto Migration of SQL tables
db *gorm.DB // Database connection for Read-Only requests (can be same as Write)
debug bool // Setting for global debugging
engine Engine // Datastore engine (MySQL, PostgreSQL, SQLite)
logger logger.Interface // Custom logger interface (from BUX)
loggerDB glogger.Interface // Custom logger interface (for GORM)
migratedModels []string // List of models (types) that have been migrated
migrateModels []interface{} // Models for migrations
mongoDB *mongo.Database // Database connection for a MongoDB datastore
mongoDBConfig *MongoDBConfig // Configuration for a MongoDB datastore
newRelicEnabled bool // If NewRelic is enabled (parent application)
sqlConfigs []*SQLConfig // Configuration for a MySQL or PostgreSQL datastore
sqLite *SQLiteConfig // Configuration for a SQLite datastore
tablePrefix string // Model table prefix
autoMigrate bool // Setting for Auto Migration of SQL tables
db *gorm.DB // Database connection for Read-Only requests (can be same as Write)
debug bool // Setting for global debugging
engine Engine // Datastore engine (MySQL, PostgreSQL, SQLite)
importBlockHeaders string // The URL of the block headers zip file to import old block headers on startup. if block 0 is found in the DB, block headers will mpt be downloaded
logger logger.Interface // Custom logger interface (from BUX)
loggerDB glogger.Interface // Custom logger interface (for GORM)
migratedModels []string // List of models (types) that have been migrated
migrateModels []interface{} // Models for migrations
mongoDB *mongo.Database // Database connection for a MongoDB datastore
mongoDBConfig *MongoDBConfig // Configuration for a MongoDB datastore
newRelicEnabled bool // If NewRelic is enabled (parent application)
sqlConfigs []*SQLConfig // Configuration for a MySQL or PostgreSQL datastore
sqLite *SQLiteConfig // Configuration for a SQLite datastore
tablePrefix string // Model table prefix
}
)

Expand Down
7 changes: 7 additions & 0 deletions datastore/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,10 @@ func WithLogger(customLogger logger.Interface) ClientOps {
}
}
}

// WithImportBlockHeaders will set the import block headers option
func WithImportBlockHeaders(blockHeadersZipFileURL string) ClientOps {
return func(c *clientOptions) {
c.importBlockHeaders = blockHeadersZipFileURL
}
}
5 changes: 5 additions & 0 deletions datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"time"

"go.mongodb.org/mongo-driver/mongo"
"gorm.io/gorm"
)

// StorageService is the storage related methods
type StorageService interface {
AutoMigrateDatabase(ctx context.Context, models ...interface{}) error
CreateInBatches(ctx context.Context, models interface{}, batchSize int) error
Execute(query string) *gorm.DB
GetModel(ctx context.Context, model interface{}, conditions map[string]interface{}, timeout time.Duration) error
GetModels(ctx context.Context, models interface{}, conditions map[string]interface{}, pageSize, page int,
Expand All @@ -33,7 +35,10 @@ type ClientInterface interface {
Engine() Engine
GetDatabaseName() string
GetTableName(modelName string) string
GetMongoCollection(collectionName string) *mongo.Collection
GetMongoCollectionByTableName(tableName string) *mongo.Collection
IsAutoMigrate() bool
IsDebug() bool
IsNewRelicEnabled() bool
ImportBlockHeadersFromURL() string
}
5 changes: 5 additions & 0 deletions datastore/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (c *Client) IsAutoMigrate() bool {
return c.options.autoMigrate
}

// ImportBlockHeadersFromURL will the URL where to import block headers from
func (c *Client) ImportBlockHeadersFromURL() string {
return c.options.importBlockHeaders
}

// autoMigrateMongoDatabase will start a new database for Mongo
func autoMigrateMongoDatabase(ctx context.Context, _ Engine, options *clientOptions,
_ ...interface{}) error {
Expand Down
14 changes: 14 additions & 0 deletions datastore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func (c *Client) IncrementModel(
return
}

// CreateInBatches create all the models given in batches
func (c *Client) CreateInBatches(
ctx context.Context,
models interface{},
batchSize int,
) error {
if c.Engine() == MongoDB {
return c.CreateInBatchesMongo(ctx, models, batchSize)
}

tx := c.options.db.CreateInBatches(models, batchSize)
return tx.Error
}

// convertToInt64 will convert an interface to an int64
func convertToInt64(i interface{}) int64 {
switch v := i.(type) {
Expand Down
74 changes: 73 additions & 1 deletion datastore/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,53 @@ func (c *Client) incrementWithMongo(
return
}

// CreateInBatchesMongo insert multiple models vai bulk.Write
func (c *Client) CreateInBatchesMongo(
ctx context.Context,
models interface{},
batchSize int,
) error {

collectionName := utils.GetModelTableName(models)
if collectionName == nil {
return ErrUnknownCollection
}

mongoModels := make([]mongo.WriteModel, 0)
collection := c.GetMongoCollection(*collectionName)
bulkOptions := options.BulkWrite().SetOrdered(true)
count := 0

switch reflect.TypeOf(models).Kind() { //nolint:exhaustive // we only get slices
case reflect.Slice:
s := reflect.ValueOf(models)
for i := 0; i < s.Len(); i++ {
m := mongo.NewInsertOneModel()
m.SetDocument(s.Index(i).Interface())
mongoModels = append(mongoModels, m)
count++

if count%batchSize == 0 {
_, err := collection.BulkWrite(ctx, mongoModels, bulkOptions)
if err != nil {
return err
}
// reset the bulk
mongoModels = make([]mongo.WriteModel, 0)
}
}
}

if count%batchSize != 0 {
_, err := collection.BulkWrite(ctx, mongoModels, bulkOptions)
if err != nil {
return err
}
}

return nil
}

// getWithMongo will get given struct(s) from MongoDB
func (c *Client) getWithMongo(
ctx context.Context,
Expand Down Expand Up @@ -207,6 +254,22 @@ func (c *Client) getWithMongo(
return nil
}

// GetMongoCollection will get the mongo collection for the given tableName
func (c *Client) GetMongoCollection(
collectionName string,
) *mongo.Collection {
return c.options.mongoDB.Collection(
setPrefix(c.options.mongoDBConfig.TablePrefix, collectionName),
)
}

// GetMongoCollectionByTableName will get the mongo collection for the given tableName
func (c *Client) GetMongoCollectionByTableName(
tableName string,
) *mongo.Collection {
return c.options.mongoDB.Collection(tableName)
}

func getFieldNames(fieldResult interface{}) []string {
if fieldResult == nil {
return []string{}
Expand Down Expand Up @@ -436,8 +499,17 @@ func openMongoDatabase(ctx context.Context, config *MongoDBConfig) (*mongo.Datab
// getMongoIndexes will get indexes from mongo
func getMongoIndexes() map[string][]mongo.IndexModel {

// todo: move these to bux out of this package
return map[string][]mongo.IndexModel{
"block_headers": {
mongo.IndexModel{Keys: bsonx.Doc{{
Key: "height",
Value: bsonx.Int32(1),
}}},
mongo.IndexModel{Keys: bsonx.Doc{{
Key: "synced",
Value: bsonx.Int32(1),
}}},
},
"destinations": {
mongo.IndexModel{Keys: bsonx.Doc{{
Key: "address",
Expand Down
3 changes: 1 addition & 2 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"net/http"
"time"

"github.com/libsv/go-bc"

"github.com/BuxOrg/bux/cachestore"
"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/datastore"
"github.com/BuxOrg/bux/logger"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
"github.com/libsv/go-bc"
"github.com/tonicpow/go-paymail"
)

Expand Down
Loading

0 comments on commit 6ff250c

Please sign in to comment.