From 6ff250cfe5ccf767922063119f9f019b28eedd98 Mon Sep 17 00:00:00 2001 From: Siggi Date: Mon, 18 Apr 2022 23:56:02 +0200 Subject: [PATCH] Added block headers import --- action_blockheader.go | 2 +- chainstate/filters/metanet_test.go | 3 +- chainstate/filters/planaria-b_test.go | 3 +- chainstate/monitor.go | 5 +- chainstate/monitor_client.go | 5 +- chainstate/monitor_handler.go | 1 - chainstate/processor_test.go | 3 +- chainstate/types.go | 10 ++ client.go | 5 +- client_internal.go | 1 + client_options.go | 7 + datastore/client.go | 29 ++-- datastore/client_options.go | 7 + datastore/interface.go | 5 + datastore/migrate.go | 5 + datastore/models.go | 14 ++ datastore/mongodb.go | 74 +++++++- interface.go | 3 +- model_blockheaders.go | 233 ++++++++++++++++++++++++-- model_utxos.go | 17 +- models_test.go | 4 +- monitor_event_handler.go | 5 +- utils/download.go | 65 +++++++ 23 files changed, 454 insertions(+), 52 deletions(-) create mode 100644 utils/download.go diff --git a/action_blockheader.go b/action_blockheader.go index 665a88eb..aa825343 100644 --- a/action_blockheader.go +++ b/action_blockheader.go @@ -6,7 +6,7 @@ import ( "github.com/libsv/go-bc" ) -// RecordBlockheader will Save a block header into the Datastore +// RecordBlockHeader will Save a block header into the Datastore // // hash is the hash of the block header // bh is the block header data diff --git a/chainstate/filters/metanet_test.go b/chainstate/filters/metanet_test.go index 29a2cc38..de3f03ed 100644 --- a/chainstate/filters/metanet_test.go +++ b/chainstate/filters/metanet_test.go @@ -1,8 +1,9 @@ package filters import ( - "github.com/mrz1836/go-whatsonchain" "testing" + + "github.com/mrz1836/go-whatsonchain" ) func TestMetanet(t *testing.T) { diff --git a/chainstate/filters/planaria-b_test.go b/chainstate/filters/planaria-b_test.go index 8212371d..b2d462ec 100644 --- a/chainstate/filters/planaria-b_test.go +++ b/chainstate/filters/planaria-b_test.go @@ -1,8 +1,9 @@ package filters import ( - "github.com/mrz1836/go-whatsonchain" "testing" + + "github.com/mrz1836/go-whatsonchain" ) func TestPlanariaB(t *testing.T) { diff --git a/chainstate/monitor.go b/chainstate/monitor.go index 6be09ef6..f83506e8 100644 --- a/chainstate/monitor.go +++ b/chainstate/monitor.go @@ -27,7 +27,6 @@ type Monitor struct { maxNumberOfDestinations int processMempoolOnConnect bool filterType string - regexList []string handler MonitorHandler } @@ -128,7 +127,7 @@ func (m *Monitor) SaveDestinations() bool { return m.saveTransactionsDestinations } -// LooadMonitoredDestinations gets where we want to add the monitored destiantions from the database into the processor +// LoadMonitoredDestinations gets where we want to add the monitored destinations from the database into the processor func (m *Monitor) LoadMonitoredDestinations() bool { return m.loadMOnitoredDestinations } @@ -144,7 +143,7 @@ func (m *Monitor) Monitor(handler MonitorHandler) error { if m.client == nil { handler.SetMonitor(m) m.handler = handler - m.logger.Info(context.Background(), "[MONITOR] Connecting to server: %s", m.centrifugeServer) + m.logger.Info(context.Background(), fmt.Sprintf("[MONITOR] Connecting to server: %s", m.centrifugeServer)) m.client = newCentrifugeClient(m.centrifugeServer, handler) if m.token != "" { m.client.SetToken(m.token) diff --git a/chainstate/monitor_client.go b/chainstate/monitor_client.go index 44adf8b8..2c3159b0 100644 --- a/chainstate/monitor_client.go +++ b/chainstate/monitor_client.go @@ -32,12 +32,13 @@ func (a *AgentClient) Disconnect() error { return a.Client.Disconnect() } +// SetToken set the client token func (a *AgentClient) SetToken(token string) { a.Client.SetToken(token) } -// TODO: Just rely on the agent for this data type // AddFilterMessage defines a new filter to be published from the client +// todo Just rely on the agent for this data type type AddFilterMessage struct { Timestamp int64 `json:"timestamp"` Regex string `json:"regex"` @@ -45,7 +46,7 @@ type AddFilterMessage struct { Hash string `json:"hash"` } -// AddFilter adds a new filtero the agent +// AddFilter adds a new filter to the agent func (a *AgentClient) AddFilter(regex, item string) (centrifuge.PublishResult, error) { msg := AddFilterMessage{ Regex: regex, diff --git a/chainstate/monitor_handler.go b/chainstate/monitor_handler.go index 73ab52fc..2ddde030 100644 --- a/chainstate/monitor_handler.go +++ b/chainstate/monitor_handler.go @@ -4,7 +4,6 @@ import ( "context" "github.com/libsv/go-bc" - "github.com/mrz1836/go-whatsonchain" ) diff --git a/chainstate/processor_test.go b/chainstate/processor_test.go index 93b574a0..76acc340 100644 --- a/chainstate/processor_test.go +++ b/chainstate/processor_test.go @@ -4,13 +4,12 @@ import ( "fmt" "testing" - "github.com/stretchr/testify/require" - "github.com/BuxOrg/bux/utils" "github.com/bitcoinschema/go-bitcoin/v2" "github.com/centrifugal/centrifuge-go" "github.com/libsv/go-bt/v2/bscript" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" boom "github.com/tylertreat/BoomFilters" ) diff --git a/chainstate/types.go b/chainstate/types.go index 3f4fbf45..234658ef 100644 --- a/chainstate/types.go +++ b/chainstate/types.go @@ -1,9 +1,19 @@ package chainstate +// TransactionType tx types type TransactionType string +// Metanet type const Metanet TransactionType = "metanet" + +// PubKeyHash type const PubKeyHash TransactionType = "pubkeyhash" + +// PlanariaB type const PlanariaB TransactionType = "planaria-b" + +// PlanariaD type const PlanariaD TransactionType = "planaria-d" + +// RareCandyFrogCartel type const RareCandyFrogCartel TransactionType = "rarecandy-frogcartel" diff --git a/client.go b/client.go index f1ae80f9..c7e6c461 100644 --- a/client.go +++ b/client.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/tonicpow/go-paymail" - "github.com/BuxOrg/bux/cachestore" "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/datastore" @@ -14,6 +12,7 @@ import ( "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/utils" "github.com/newrelic/go-agent/v3/newrelic" + "github.com/tonicpow/go-paymail" "github.com/tonicpow/go-paymail/server" ) @@ -172,7 +171,7 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error) // Load the blockchain monitor if client.options.chainstate.Monitor() != nil { - if err := client.loadMonitor(ctx); err != nil { + if err = client.loadMonitor(ctx); err != nil { return nil, err } } diff --git a/client_internal.go b/client_internal.go index 2ca26eb3..a0f99cb9 100644 --- a/client_internal.go +++ b/client_internal.go @@ -109,6 +109,7 @@ func (c *Client) loadMonitor(ctx context.Context) (err error) { func (c *Client) runModelMigrations(models ...interface{}) (err error) { d := c.Datastore() for _, model := range models { + model.(ModelInterface).SetOptions(WithClient(c)) if err = model.(ModelInterface).Migrate(d); err != nil { return } diff --git a/client_options.go b/client_options.go index e144dc8e..07539a34 100644 --- a/client_options.go +++ b/client_options.go @@ -250,6 +250,13 @@ func WithIUCDisabled() ClientOps { } } +// WithImportBlockHeaders will import block headers on startup +func WithImportBlockHeaders(blockHeadersZipFileURL string) ClientOps { + return func(c *clientOptions) { + c.dataStore.options = append(c.dataStore.options, datastore.WithImportBlockHeaders(blockHeadersZipFileURL)) + } +} + // WithLogger will set the custom logger interface func WithLogger(customLogger logger.Interface) ClientOps { return func(c *clientOptions) { diff --git a/datastore/client.go b/datastore/client.go index 1c44968f..6a71701a 100644 --- a/datastore/client.go +++ b/datastore/client.go @@ -20,20 +20,21 @@ type ( // clientOptions holds all the configuration for the client clientOptions struct { - autoMigrate bool // Setting for Auto Migration of SQL tables - db *gorm.DB // Database connection for Read-Only requests (can be same as Write) - debug bool // Setting for global debugging - engine Engine // Datastore engine (MySQL, PostgreSQL, SQLite) - logger logger.Interface // Custom logger interface (from BUX) - loggerDB glogger.Interface // Custom logger interface (for GORM) - migratedModels []string // List of models (types) that have been migrated - migrateModels []interface{} // Models for migrations - mongoDB *mongo.Database // Database connection for a MongoDB datastore - mongoDBConfig *MongoDBConfig // Configuration for a MongoDB datastore - newRelicEnabled bool // If NewRelic is enabled (parent application) - sqlConfigs []*SQLConfig // Configuration for a MySQL or PostgreSQL datastore - sqLite *SQLiteConfig // Configuration for a SQLite datastore - tablePrefix string // Model table prefix + autoMigrate bool // Setting for Auto Migration of SQL tables + db *gorm.DB // Database connection for Read-Only requests (can be same as Write) + debug bool // Setting for global debugging + engine Engine // Datastore engine (MySQL, PostgreSQL, SQLite) + importBlockHeaders string // The URL of the block headers zip file to import old block headers on startup. if block 0 is found in the DB, block headers will mpt be downloaded + logger logger.Interface // Custom logger interface (from BUX) + loggerDB glogger.Interface // Custom logger interface (for GORM) + migratedModels []string // List of models (types) that have been migrated + migrateModels []interface{} // Models for migrations + mongoDB *mongo.Database // Database connection for a MongoDB datastore + mongoDBConfig *MongoDBConfig // Configuration for a MongoDB datastore + newRelicEnabled bool // If NewRelic is enabled (parent application) + sqlConfigs []*SQLConfig // Configuration for a MySQL or PostgreSQL datastore + sqLite *SQLiteConfig // Configuration for a SQLite datastore + tablePrefix string // Model table prefix } ) diff --git a/datastore/client_options.go b/datastore/client_options.go index 89aca7ee..10268e80 100644 --- a/datastore/client_options.go +++ b/datastore/client_options.go @@ -204,3 +204,10 @@ func WithLogger(customLogger logger.Interface) ClientOps { } } } + +// WithImportBlockHeaders will set the import block headers option +func WithImportBlockHeaders(blockHeadersZipFileURL string) ClientOps { + return func(c *clientOptions) { + c.importBlockHeaders = blockHeadersZipFileURL + } +} diff --git a/datastore/interface.go b/datastore/interface.go index 9f4d1a6c..fe729d3e 100644 --- a/datastore/interface.go +++ b/datastore/interface.go @@ -4,12 +4,14 @@ import ( "context" "time" + "go.mongodb.org/mongo-driver/mongo" "gorm.io/gorm" ) // StorageService is the storage related methods type StorageService interface { AutoMigrateDatabase(ctx context.Context, models ...interface{}) error + CreateInBatches(ctx context.Context, models interface{}, batchSize int) error Execute(query string) *gorm.DB GetModel(ctx context.Context, model interface{}, conditions map[string]interface{}, timeout time.Duration) error GetModels(ctx context.Context, models interface{}, conditions map[string]interface{}, pageSize, page int, @@ -33,7 +35,10 @@ type ClientInterface interface { Engine() Engine GetDatabaseName() string GetTableName(modelName string) string + GetMongoCollection(collectionName string) *mongo.Collection + GetMongoCollectionByTableName(tableName string) *mongo.Collection IsAutoMigrate() bool IsDebug() bool IsNewRelicEnabled() bool + ImportBlockHeadersFromURL() string } diff --git a/datastore/migrate.go b/datastore/migrate.go index 3980c758..421b44c2 100644 --- a/datastore/migrate.go +++ b/datastore/migrate.go @@ -60,6 +60,11 @@ func (c *Client) IsAutoMigrate() bool { return c.options.autoMigrate } +// ImportBlockHeadersFromURL will the URL where to import block headers from +func (c *Client) ImportBlockHeadersFromURL() string { + return c.options.importBlockHeaders +} + // autoMigrateMongoDatabase will start a new database for Mongo func autoMigrateMongoDatabase(ctx context.Context, _ Engine, options *clientOptions, _ ...interface{}) error { diff --git a/datastore/models.go b/datastore/models.go index 5ab02b51..beaba73a 100644 --- a/datastore/models.go +++ b/datastore/models.go @@ -127,6 +127,20 @@ func (c *Client) IncrementModel( return } +// CreateInBatches create all the models given in batches +func (c *Client) CreateInBatches( + ctx context.Context, + models interface{}, + batchSize int, +) error { + if c.Engine() == MongoDB { + return c.CreateInBatchesMongo(ctx, models, batchSize) + } + + tx := c.options.db.CreateInBatches(models, batchSize) + return tx.Error +} + // convertToInt64 will convert an interface to an int64 func convertToInt64(i interface{}) int64 { switch v := i.(type) { diff --git a/datastore/mongodb.go b/datastore/mongodb.go index b22ff60b..0f76b83a 100644 --- a/datastore/mongodb.go +++ b/datastore/mongodb.go @@ -116,6 +116,53 @@ func (c *Client) incrementWithMongo( return } +// CreateInBatchesMongo insert multiple models vai bulk.Write +func (c *Client) CreateInBatchesMongo( + ctx context.Context, + models interface{}, + batchSize int, +) error { + + collectionName := utils.GetModelTableName(models) + if collectionName == nil { + return ErrUnknownCollection + } + + mongoModels := make([]mongo.WriteModel, 0) + collection := c.GetMongoCollection(*collectionName) + bulkOptions := options.BulkWrite().SetOrdered(true) + count := 0 + + switch reflect.TypeOf(models).Kind() { //nolint:exhaustive // we only get slices + case reflect.Slice: + s := reflect.ValueOf(models) + for i := 0; i < s.Len(); i++ { + m := mongo.NewInsertOneModel() + m.SetDocument(s.Index(i).Interface()) + mongoModels = append(mongoModels, m) + count++ + + if count%batchSize == 0 { + _, err := collection.BulkWrite(ctx, mongoModels, bulkOptions) + if err != nil { + return err + } + // reset the bulk + mongoModels = make([]mongo.WriteModel, 0) + } + } + } + + if count%batchSize != 0 { + _, err := collection.BulkWrite(ctx, mongoModels, bulkOptions) + if err != nil { + return err + } + } + + return nil +} + // getWithMongo will get given struct(s) from MongoDB func (c *Client) getWithMongo( ctx context.Context, @@ -207,6 +254,22 @@ func (c *Client) getWithMongo( return nil } +// GetMongoCollection will get the mongo collection for the given tableName +func (c *Client) GetMongoCollection( + collectionName string, +) *mongo.Collection { + return c.options.mongoDB.Collection( + setPrefix(c.options.mongoDBConfig.TablePrefix, collectionName), + ) +} + +// GetMongoCollectionByTableName will get the mongo collection for the given tableName +func (c *Client) GetMongoCollectionByTableName( + tableName string, +) *mongo.Collection { + return c.options.mongoDB.Collection(tableName) +} + func getFieldNames(fieldResult interface{}) []string { if fieldResult == nil { return []string{} @@ -436,8 +499,17 @@ func openMongoDatabase(ctx context.Context, config *MongoDBConfig) (*mongo.Datab // getMongoIndexes will get indexes from mongo func getMongoIndexes() map[string][]mongo.IndexModel { - // todo: move these to bux out of this package return map[string][]mongo.IndexModel{ + "block_headers": { + mongo.IndexModel{Keys: bsonx.Doc{{ + Key: "height", + Value: bsonx.Int32(1), + }}}, + mongo.IndexModel{Keys: bsonx.Doc{{ + Key: "synced", + Value: bsonx.Int32(1), + }}}, + }, "destinations": { mongo.IndexModel{Keys: bsonx.Doc{{ Key: "address", diff --git a/interface.go b/interface.go index 3fc7eace..56e43e08 100644 --- a/interface.go +++ b/interface.go @@ -5,8 +5,6 @@ import ( "net/http" "time" - "github.com/libsv/go-bc" - "github.com/BuxOrg/bux/cachestore" "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/datastore" @@ -14,6 +12,7 @@ import ( "github.com/BuxOrg/bux/notifications" "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/utils" + "github.com/libsv/go-bc" "github.com/tonicpow/go-paymail" ) diff --git a/model_blockheaders.go b/model_blockheaders.go index e6b2843f..da00459c 100644 --- a/model_blockheaders.go +++ b/model_blockheaders.go @@ -2,9 +2,18 @@ package bux import ( "context" + "database/sql" + "encoding/csv" "encoding/hex" + "errors" + "io" + "io/ioutil" + "os" + "strconv" + "time" "github.com/BuxOrg/bux/datastore" + "github.com/BuxOrg/bux/utils" "github.com/libsv/go-bc" ) @@ -16,14 +25,15 @@ type BlockHeader struct { Model `bson:",inline"` // Model specific fields - Hash string `json:"hash" toml:"hash" yaml:"hash" gorm:"<-:create;type:char(64);primaryKey;comment:This is the block header" bson:"hash"` - Height uint32 `json:"height" toml:"height" yaml:"height" gorm:"<-create;type:int;uniqueIndex;comment:This is the block height" bson:"height,omitempty"` - Time uint32 `json:"time" toml:"time" yaml:"time" gorm:"<-create;type:int;index;comment:This is the time the block was mined" bson:"time,omitempty"` - Nonce uint32 `json:"nonce" toml:"nonce" yaml:"nonce" gorm:"<-create;type:int;comment:This is the nonce" bson:"nonce,omitempty"` - Version uint32 `json:"version" toml:"version" yaml:"version" gorm:"<-create;type:int;comment:This is the version" bson:"version,omitempty"` - HashPreviousBlock string `json:"hash_previous_block" toml:"hash_previous_block" yaml:"hash_previous_block" gorm:"<-:create;type:text;index;comment:This is the hash of the previous block" bson:"hash_previous_block"` - HashMerkleRoot string `json:"hash_merkle_root" toml:"hash_merkle_root" yaml:"hash_merkle_root" gorm:"<-;type:text;index;comment:This is the hash of the merkle root" bson:"hash_merkle_root"` - Bits string `json:"bits" toml:"bits" yaml:"bits" gorm:"<-:create;type:text;comment:This is the block difficulty" bson:"bits"` + ID string `json:"id" toml:"id" yaml:"id" gorm:"<-:create;type:char(64);primaryKey;comment:This is the block hash" bson:"_id"` + Height uint32 `json:"height" toml:"height" yaml:"height" gorm:"<-create;type:int unsigned;uniqueIndex;comment:This is the block height" bson:"height"` + Time uint32 `json:"time" toml:"time" yaml:"time" gorm:"<-create;type:int unsigned;index;comment:This is the time the block was mined" bson:"time"` + Nonce uint32 `json:"nonce" toml:"nonce" yaml:"nonce" gorm:"<-create;type:int unsigned;comment:This is the nonce" bson:"nonce"` + Version uint32 `json:"version" toml:"version" yaml:"version" gorm:"<-create;type:int unsigned;comment:This is the version" bson:"version"` + HashPreviousBlock string `json:"hash_previous_block" toml:"hash_previous_block" yaml:"hash_previous_block" gorm:"<-:create;type:char(64);index;comment:This is the hash of the previous block" bson:"hash_previous_block"` + HashMerkleRoot string `json:"hash_merkle_root" toml:"hash_merkle_root" yaml:"hash_merkle_root" gorm:"<-;type:char(64);index;comment:This is the hash of the merkle root" bson:"hash_merkle_root"` + Bits string `json:"bits" toml:"bits" yaml:"bits" gorm:"<-:create;type:int unsigned;comment:This is the block difficulty" bson:"bits"` + Synced utils.NullTime `json:"synced" toml:"synced" yaml:"synced" gorm:"type:timestamp;index;comment:This is when the block was last synced to the bux server" bson:"synced,omitempty"` } // newBlockHeader will start a new transaction model @@ -31,7 +41,7 @@ func newBlockHeader(hash string, blockHeader bc.BlockHeader, opts ...ModelOps) ( // Create a new model bh = &BlockHeader{ - Hash: hash, + ID: hash, Model: *NewBaseModel(ModelBlockHeader, opts...), } @@ -57,7 +67,7 @@ func (m *BlockHeader) Save(ctx context.Context) (err error) { // GetHash will get the hash of the block header func (m *BlockHeader) GetHash() string { - return m.Hash + return m.ID } // setHeaderInfo will set the block header info from a bc.BlockHeader @@ -72,7 +82,27 @@ func (m *BlockHeader) setHeaderInfo(bh bc.BlockHeader) { // GetID will return the id of the field (hash) func (m *BlockHeader) GetID() string { - return m.Hash + return m.ID +} + +// getBlockHeaderByHeight will get the block header given by height +func getBlockHeaderByHeight(ctx context.Context, height uint32, opts ...ModelOps) (*BlockHeader, error) { + + // Construct an empty model + blockHeader := &BlockHeader{ + Height: height, + Model: *NewBaseModel(ModelDestination, opts...), + } + + // Get the record + if err := Get(ctx, blockHeader, nil, true, defaultDatabaseReadTimeout); err != nil { + if errors.Is(err, datastore.ErrNoResults) { + return nil, nil + } + return nil, err + } + + return blockHeader, nil } // BeforeCreating will fire before the model is being inserted into the Datastore @@ -81,7 +111,7 @@ func (m *BlockHeader) BeforeCreating(_ context.Context) error { m.DebugLog("starting: " + m.Name() + " BeforeCreating hook...") // Test for required field(s) - if len(m.Hash) == 0 { + if len(m.ID) == 0 { return ErrMissingFieldHash } @@ -104,5 +134,182 @@ func (m *BlockHeader) Display() interface{} { // Migrate model specific migration on startup func (m *BlockHeader) Migrate(client datastore.ClientInterface) error { - return client.IndexMetadata(client.GetTableName(tableBlockHeaders), metadataField) + // import all previous block headers from file + blockHeadersFile := client.ImportBlockHeadersFromURL() + if blockHeadersFile != "" { + go func() { + ctx := context.Background() + // check whether we have block header 0, then we do not import again + blockHeader0, err := getBlockHeaderByHeight(ctx, 0, m.Client().DefaultModelOptions()...) + if err != nil { + m.Client().Logger().Error(ctx, err.Error()) + } else { + if blockHeader0 == nil { + // import block headers in the background + m.Client().Logger().Info(ctx, "Importing block headers into database") + err = m.importBlockHeaders(ctx, client, blockHeadersFile) + if err != nil { + m.Client().Logger().Error(ctx, err.Error()) + } else { + m.Client().Logger().Info(ctx, "Successfully imported all block headers into database") + } + } + } + }() + } + + return nil +} + +func (m *BlockHeader) importBlockHeaders(ctx context.Context, client datastore.ClientInterface, blockHeadersFile string) error { + + file, err := ioutil.TempFile("", "blocks_bux.tsv") + if err != nil { + return err + } + defer func() { + err = os.Remove(file.Name()) + if err != nil { + m.Client().Logger().Error(ctx, err.Error()) + } + }() + + err = utils.DownloadAndUnzipFile(ctx, file, blockHeadersFile) + if err != nil { + return err + } + + blockFile := file.Name() + + /* local file import + var err error + pwd, _ := os.Getwd() + blockFile := pwd + "/blocks/blocks_bux.tsv" + */ + + batchSize := 1000 + if m.Client().Datastore().Engine() == datastore.MongoDB { + batchSize = 10000 + } + models := make([]*BlockHeader, 0) + count := 0 + readModel := func(model *BlockHeader) error { + count++ + + models = append(models, model) + + if count%batchSize == 0 { + // insert in batches of batchSize + err = client.CreateInBatches(ctx, models, batchSize) + if err != nil { + return err + } + // reset models + models = make([]*BlockHeader, 0) + } + return nil + } + + // accumulate the models into a slice + err = m.importCSVFile(ctx, blockFile, readModel) + if errors.Is(err, io.EOF) { + if count%batchSize != 0 { + // remaining batch + return client.CreateInBatches(ctx, models, batchSize) + } + return nil + } else if err != nil { + return err + } + + return nil +} + +func (m *BlockHeader) importCSVFile(ctx context.Context, blockFile string, readModel func(model *BlockHeader) error) error { + CSVFile, err := os.Open(blockFile) //nolint:gosec // file only added by administrator via config + if err != nil { + return err + } + defer func() { + err = CSVFile.Close() + if err != nil { + m.Client().Logger().Error(ctx, err.Error()) + } + }() + + reader := csv.NewReader(CSVFile) + reader.Comma = '\t' // It's a tab-delimited file + reader.LazyQuotes = true // Some fields are like \t"F" ST.\t + reader.FieldsPerRecord = 0 // -1 is variable #, 0 is [0]th line's # + reader.TrimLeadingSpace = false // Keep the fields' whitespace how it is + + // read first line - HEADER + _, err = reader.Read() + if err != nil { + return err + } + + for { + var row []string + row, err = reader.Read() + if err != nil { + return err + } + + var parsedInt uint64 + + parsedInt, err = strconv.ParseUint(row[1], 10, 32) + if err != nil { + return err + } + height := uint32(parsedInt) + + parsedInt, err = strconv.ParseUint(row[3], 10, 32) + if err != nil { + return err + } + nonce := uint32(parsedInt) + + parsedInt, err = strconv.ParseUint(row[4], 10, 32) + if err != nil { + return err + } + ver := uint32(parsedInt) + parsedInt, err = strconv.ParseUint(row[7], 10, 32) + if err != nil { + return err + } + bits := parsedInt + + var timeField time.Time + timeField, err = time.Parse("2006-01-02 15:04:05", row[2]) + if err != nil { + return err + } + + var syncedTime time.Time + syncedTime, err = time.Parse("2006-01-02 15:04:05", row[8]) + if err != nil { + return err + } + + model := &BlockHeader{ + ID: row[0], + Height: height, + Time: uint32(timeField.Unix()), + Nonce: nonce, + Version: ver, + HashPreviousBlock: row[5], + HashMerkleRoot: row[6], + Bits: strconv.FormatUint(bits, 16), + Synced: utils.NullTime{NullTime: sql.NullTime{Valid: true, Time: syncedTime}}, + } + model.Model.CreatedAt = time.Now() + + // call the readModel callback function to add the model to the database + err = readModel(model) + if err != nil { + return err + } + } } diff --git a/model_utxos.go b/model_utxos.go index 4e4dfb7a..cbbbfc6d 100644 --- a/model_utxos.go +++ b/model_utxos.go @@ -31,7 +31,7 @@ type Utxo struct { XpubID string `json:"xpub_id" toml:"xpub_id" yaml:"xpub_id" gorm:"<-:create;type:char(64);index;comment:This is the related xPub" bson:"xpub_id"` Satoshis uint64 `json:"satoshis" toml:"satoshis" yaml:"satoshis" gorm:"<-:create;type:uint;comment:This is the amount of satoshis in the output" bson:"satoshis"` ScriptPubKey string `json:"script_pub_key" toml:"script_pub_key" yaml:"script_pub_key" gorm:"<-:create;type:text;comment:This is the script pub key" bson:"script_pub_key"` - Type string `json:"type" toml:"type" yaml:"type" gorm:"<-:create;type:text;comment:Type of output" bson:"type"` + Type string `json:"type" toml:"type" yaml:"type" gorm:"<-:create;type:varchar(32);comment:Type of output" bson:"type"` DraftID utils.NullString `json:"draft_id" toml:"draft_id" yaml:"draft_id" gorm:"<-;type:varchar(64);index;comment:Related draft id for reservations" bson:"draft_id,omitempty"` ReservedAt utils.NullTime `json:"reserved_at" toml:"reserved_at" yaml:"reserved_at" gorm:"<-;comment:When it was reserved" bson:"reserved_at,omitempty"` SpendingTxID utils.NullString `json:"spending_tx_id,omitempty" toml:"spending_tx_id" yaml:"spending_tx_id" gorm:"<-;type:char(64);index;comment:This is tx ID of the spend" bson:"spending_tx_id,omitempty"` @@ -402,6 +402,17 @@ func (m *Utxo) migratePostgreSQL(client datastore.ClientInterface, tableName str // migrateMySQL is specific migration SQL for MySQL func (m *Utxo) migrateMySQL(client datastore.ClientInterface, tableName string) error { - tx := client.Execute("CREATE INDEX idx_utxo_reserved ON `" + tableName + "` (xpub_id,type,draft_id,spending_tx_id)") - return tx.Error + idxName := "idx_" + tableName + "_reserved" + idxExists, err := client.IndexExists(tableName, idxName) + if err != nil { + return err + } + if !idxExists { + tx := client.Execute("CREATE INDEX `" + idxName + "` ON `" + tableName + "` (xpub_id,type,draft_id,spending_tx_id)") + if tx.Error != nil { + return tx.Error + } + } + + return nil } diff --git a/models_test.go b/models_test.go index c9ded932..261045c3 100644 --- a/models_test.go +++ b/models_test.go @@ -122,7 +122,7 @@ func TestModel_GetModelTableName(t *testing.T) { }) } -func (ts *EmbeddedDBTestSuite) createXpubModels(tc *TestingClient, t *testing.T, number int) *TestingClient { +func (ts *EmbeddedDBTestSuite) createXpubModels(tc *TestingClient, t *testing.T, number int) { for i := 0; i < number; i++ { _, xPublicKey, err := bitcoin.GenerateHDKeyPair(bitcoin.SecureSeedLength) require.NoError(t, err) @@ -133,8 +133,6 @@ func (ts *EmbeddedDBTestSuite) createXpubModels(tc *TestingClient, t *testing.T, err = xPub.Save(tc.ctx) require.NoError(t, err) } - - return tc } type xPubFieldsTest struct { diff --git a/monitor_event_handler.go b/monitor_event_handler.go index 84ca734d..dbf8cbed 100644 --- a/monitor_event_handler.go +++ b/monitor_event_handler.go @@ -6,11 +6,10 @@ import ( "fmt" "runtime" - "github.com/libsv/go-bc" - "github.com/BuxOrg/bux/chainstate" "github.com/centrifugal/centrifuge-go" "github.com/korovkin/limiter" + "github.com/libsv/go-bc" "github.com/mrz1836/go-whatsonchain" ) @@ -159,6 +158,7 @@ func (h *MonitorEventHandler) processMempoolPublish(_ *centrifuge.Client, e cent if h.monitor.SaveDestinations() { // Process transaction and save outputs + fmt.Printf("Should save the destination here...\n") } if tx == "" { @@ -216,6 +216,7 @@ func (h *MonitorEventHandler) onServerPublishLinear(c *centrifuge.Client, e cent if h.monitor.SaveDestinations() { // Process transaction and save outputs + fmt.Printf("Should save the destination here...\n") } if tx == "" { diff --git a/utils/download.go b/utils/download.go new file mode 100644 index 00000000..eed2d4f6 --- /dev/null +++ b/utils/download.go @@ -0,0 +1,65 @@ +package utils + +import ( + "archive/zip" + "context" + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" +) + +// DownloadAndUnzipFile download the zip file from the URL and put it's content in the file +func DownloadAndUnzipFile(ctx context.Context, file *os.File, URL string) error { + + client := http.Client{} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil) + if err != nil { + return err + } + + var resp *http.Response + if resp, err = client.Do(req); err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + + zipFileName := filepath.Base(file.Name()) + ".zip" + + var zipFile *os.File + zipFile, err = ioutil.TempFile("", zipFileName) + if err != nil { + return err + } + defer func() { + _ = os.Remove(zipFile.Name()) + }() + + // Write the body to file + _, err = io.Copy(zipFile, resp.Body) + if err != nil { + return err + } + + var reader *zip.ReadCloser + reader, err = zip.OpenReader(zipFile.Name()) + if err != nil { + return err + } + defer func() { + _ = reader.Close() + }() + + if len(reader.File) == 1 { + in, _ := reader.File[0].Open() + defer func() { + _ = in.Close() + }() + _, err = io.Copy(file, in) + } + + return err +}