diff --git a/tomox/batchdb.go b/tomox/batchdb.go index 661174119e4b..e7cceed674e9 100644 --- a/tomox/batchdb.go +++ b/tomox/batchdb.go @@ -7,11 +7,11 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" ) const ( defaultCacheLimit = 1024 - defaultMaxPending = 1024 ) type BatchItem struct { @@ -20,7 +20,6 @@ type BatchItem struct { type BatchDatabase struct { db *ethdb.LDBDatabase - itemCacheLimit int emptyKey []byte cacheItems *lru.Cache // Cache for reading dryRunCache *lru.Cache @@ -44,12 +43,11 @@ func NewBatchDatabaseWithEncode(datadir string, cacheLimit int) *BatchDatabase { itemCacheLimit = cacheLimit } - cacheItems, _ := lru.New(defaultCacheLimit) - dryRunCache, _ := lru.New(defaultCacheLimit) + cacheItems, _ := lru.New(itemCacheLimit) + dryRunCache, _ := lru.New(itemCacheLimit) batchDB := &BatchDatabase{ db: db, - itemCacheLimit: itemCacheLimit, cacheItems: cacheItems, emptyKey: EmptyKey(), // pre alloc for comparison dryRunCache: dryRunCache, @@ -140,12 +138,12 @@ func (db *BatchDatabase) Put(key []byte, val interface{}, dryrun bool) error { return nil } + log.Debug("Debug DB put to cacheItems", "cacheKey", cacheKey, "val", val) db.cacheItems.Add(cacheKey, val) value, err := EncodeBytesItem(val) if err != nil { return err } - log.Debug("Debug DB put", "cacheKey", cacheKey, "val", val) return db.db.Put(key, value) } @@ -154,6 +152,7 @@ func (db *BatchDatabase) Delete(key []byte, dryrun bool) error { // for better performance, we can mark a Deleted flag, to do batch delete cacheKey := db.getCacheKey(key) + //mark it to nil in dryrun cache if dryrun { log.Debug("Debug DB delete from dry-run cache", "cacheKey", cacheKey) db.dryRunCache.Add(cacheKey, nil) @@ -176,28 +175,36 @@ func (db *BatchDatabase) SaveDryRunResult() error { for _, cacheKey := range db.dryRunCache.Keys() { key, err := hex.DecodeString(cacheKey.(string)) if err != nil { - log.Error("Can't save dry-run result", "err", err) + log.Error("Can't save dry-run result (hex.DecodeString)", "err", err) return err } val, ok := db.dryRunCache.Get(cacheKey) if !ok { - continue + err := errors.New("can't get item from dryrun cache") + log.Error("Can't save dry-run result (db.dryRunCache.Get)", "err", err) + return err } if val == nil { - db.db.Delete(key) + if err := db.db.Delete(key); err != nil { + log.Error("Can't save dry-run result (db.db.Delete)", "err", err) + return err + } continue } value, err := EncodeBytesItem(val) if err != nil { - log.Error("Can't save dry-run result", "err", err) + log.Error("Can't save dry-run result (EncodeBytesItem)", "err", err) return err } - batch.Put(key, value) + if err := batch.Put(key, value); err != nil { + log.Error("Can't save dry-run result (batch.Put)", "err", err) + return err + } log.Debug("Saved dry-run result to DB", "cacheKey", hex.EncodeToString(key), "value", ToJSON(val)) } // purge cache data db.dryRunCache.Purge() return batch.Write() -} \ No newline at end of file +} diff --git a/tomox/mongodb.go b/tomox/mongodb.go index 7176e53c0346..44a45a18f057 100644 --- a/tomox/mongodb.go +++ b/tomox/mongodb.go @@ -4,12 +4,14 @@ import ( "bytes" "encoding/hex" "time" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/tomochain/tomox-sdk/types" + "github.com/hashicorp/golang-lru" ) type MongoItem struct { @@ -25,12 +27,12 @@ type MongoDatabase struct { Session *mgo.Session dbName string emptyKey []byte - itemMaxPending int - pendingItems map[string]*MongoItem + dryRunCache *lru.Cache + cacheItems *lru.Cache // Cache for reading } // InitSession initializes a new session with mongodb -func NewMongoDatabase(session *mgo.Session, mongoURL string) (*MongoDatabase, error) { +func NewMongoDatabase(session *mgo.Session, mongoURL string, cacheLimit int) (*MongoDatabase, error) { dbName := "tomodex" mongoURL = "mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0" if session == nil { @@ -42,12 +44,18 @@ func NewMongoDatabase(session *mgo.Session, mongoURL string) (*MongoDatabase, er session = ns } + itemCacheLimit := defaultCacheLimit + if cacheLimit > 0 { + itemCacheLimit = cacheLimit + } + cacheItems, _ := lru.New(itemCacheLimit) + dryRunCache, _ := lru.New(itemCacheLimit) db := &MongoDatabase{ Session: session, dbName: dbName, - itemMaxPending: defaultMaxPending, - pendingItems: make(map[string]*MongoItem), + cacheItems: cacheItems, + dryRunCache: dryRunCache, } return db, nil @@ -61,15 +69,23 @@ func (db *MongoDatabase) getCacheKey(key []byte) string { return hex.EncodeToString(key) } -func (db *MongoDatabase) Has(key []byte) (bool, error) { +func (db *MongoDatabase) Has(key []byte, dryrun bool) (bool, error) { if db.IsEmptyKey(key) { return false, nil } cacheKey := db.getCacheKey(key) - // has in pending and is not deleted - if _, ok := db.pendingItems[cacheKey]; ok { + if dryrun { + if val, ok := db.dryRunCache.Get(cacheKey); ok { + if val == nil { + return false, nil + } + return true, nil + } + } + + if db.cacheItems.Contains(cacheKey) { return true, nil } @@ -103,7 +119,7 @@ func (db *MongoDatabase) Has(key []byte) (bool, error) { return false, nil } -func (db *MongoDatabase) Get(key []byte, val interface{}) (interface{}, error) { +func (db *MongoDatabase) Get(key []byte, val interface{}, dryrun bool) (interface{}, error) { if db.IsEmptyKey(key) { return nil, nil @@ -111,83 +127,104 @@ func (db *MongoDatabase) Get(key []byte, val interface{}) (interface{}, error) { cacheKey := db.getCacheKey(key) - if pendingItem, ok := db.pendingItems[cacheKey]; ok { - // we get value from the pending item - return pendingItem.Value, nil + if dryrun { + if value, ok := db.dryRunCache.Get(cacheKey); ok { + log.Debug("Debug get from dry-run cache", "cacheKey", cacheKey, "val", value) + return value, nil + } } - log.Debug("Cache info (DB get)", "pending map", db.pendingItems, "cacheKey", cacheKey) - sc := db.Session.Copy() - defer sc.Close() + if cached, ok := db.cacheItems.Get(cacheKey); ok && !dryrun { + return cached, nil + } else { + sc := db.Session.Copy() + defer sc.Close() - query := bson.M{"key": cacheKey} + query := bson.M{"key": cacheKey} - switch val.(type) { - case *OrderItem: - var oi *OrderItem - err := sc.DB(db.dbName).C("orders").Find(query).One(&oi) - if err != nil { - return nil, err - } - return oi, nil - default: - var i *MongoItemRecord - err := sc.DB(db.dbName).C("items").Find(query).One(&i) - if err != nil { - return nil, err + switch val.(type) { + case *OrderItem: + var oi *OrderItem + err := sc.DB(db.dbName).C("orders").Find(query).One(&oi) + if err != nil { + return nil, err + } + db.cacheItems.Add(cacheKey, oi) + return oi, nil + default: + var i *MongoItemRecord + err := sc.DB(db.dbName).C("items").Find(query).One(&i) + if err != nil { + return nil, err + } + err = DecodeBytesItem(common.Hex2Bytes(i.Value), val) + if err != nil { + return nil, err + } + db.cacheItems.Add(cacheKey, val) + return val, nil } - err = DecodeBytesItem(common.Hex2Bytes(i.Value), val) - if err != nil { - return nil, err - } - return val, nil } } -func (db *MongoDatabase) Put(key []byte, val interface{}) error { +func (db *MongoDatabase) Put(key []byte, val interface{}, dryrun bool) error { cacheKey := db.getCacheKey(key) + if dryrun { + log.Debug("Debug put to dry-run cache", "cacheKey", cacheKey, "val", val) + db.dryRunCache.Add(cacheKey, val) + return nil + } + + log.Debug("Debug DB put", "cacheKey", cacheKey, "val", val) + db.cacheItems.Add(cacheKey, val) + val, err := EncodeBytesItem(val) + if err != nil { + return err + } + switch val.(type) { case *types.Trade: - err := db.CommitTrade(val.(*types.Trade)) // Put trade into "trades" collection - - if err != nil { + // Put trade into "trades" collection + if err := db.CommitTrade(val.(*types.Trade)); err != nil { log.Error(err.Error()) return err } case *OrderItem: - err := db.CommitOrder(cacheKey, val.(*OrderItem)) // Put order into "orders" collection - - if err != nil { + // Put order into "orders" collection + if err := db.CommitOrder(cacheKey, val.(*OrderItem)); err != nil { log.Error(err.Error()) return err } - db.pendingItems[cacheKey] = &MongoItem{Value: val} default: - db.pendingItems[cacheKey] = &MongoItem{Value: val} - log.Debug("Cache info (DB put - trades & orders committed directly)", "pending map", db.pendingItems, "cacheKey", cacheKey) - } - - // Put everything (includes order) into "items" collection - if len(db.pendingItems) >= db.itemMaxPending { - return db.Commit() + // put general item into "items" collection + if err := db.CommitItem(cacheKey, val); err != nil { + log.Error(err.Error()) + return err + } } return nil } -func (db *MongoDatabase) Delete(key []byte, force bool) error { +func (db *MongoDatabase) Delete(key []byte, dryrun bool) error { cacheKey := db.getCacheKey(key) - // Delete from object m.pendingItems - delete(db.pendingItems, cacheKey) + //mark it to nil in dryrun cache + if dryrun { + log.Debug("Debug DB delete from dry-run cache", "cacheKey", cacheKey) + db.dryRunCache.Add(cacheKey, nil) + return nil + } + + log.Debug("Debug DB delete ", "cacheKey", cacheKey) + db.cacheItems.Remove(cacheKey) sc := db.Session.Copy() defer sc.Close() query := bson.M{"key": cacheKey} - found, err := db.Has(key) - + found, err := db.Has(key, dryrun) if err != nil { return err } @@ -209,38 +246,44 @@ func (db *MongoDatabase) Delete(key []byte, force bool) error { return nil } -func (db *MongoDatabase) Commit() error { +func (db *MongoDatabase) InitDryRunMode() { + log.Debug("Start dry-run mode, clear old data") + db.dryRunCache.Purge() +} + +//TODO: should use batch commit to avoid data inconsistency +func (db *MongoDatabase) SaveDryRunResult() error { sc := db.Session.Copy() defer sc.Close() - for cacheKey, item := range db.pendingItems { - valByte, err := EncodeBytesItem(item.Value) - + for _, cacheKey := range db.dryRunCache.Keys() { + key, err := hex.DecodeString(cacheKey.(string)) if err != nil { - log.Error(err.Error()) - continue + log.Error("Can't save dry-run result (hex.DecodeString)", "err", err) + return err } - - r := &MongoItemRecord{ - Key: cacheKey, - Value: common.Bytes2Hex(valByte), + val, ok := db.dryRunCache.Get(cacheKey) + if !ok { + err := errors.New("can't get item from dryrun cache") + log.Error("Can't save dry-run result (db.dryRunCache.Get)", "err", err) + return err } - - query := bson.M{"key": cacheKey} - - _, err = sc.DB(db.dbName).C("items").Upsert(query, r) - - if err != nil { + if val == nil { + //TODO: don't remove order item in mongo + if err := db.Delete(key,false); err != nil { + log.Error("Can't save dry-run result (db.Delete)", "err", err) + return err + } + continue + } + if err := db.Put(key, val, false); err != nil { + log.Error("Can't save dry-run result (db.Put)", "err", err) return err } - - log.Warn("Save", "cacheKey", cacheKey, "value", ToJSON(item.Value)) } - - // Reset the object db.pendingItems - db.pendingItems = make(map[string]*MongoItem) - + // purge cache data + db.dryRunCache.Purge() return nil } @@ -288,3 +331,20 @@ func (db *MongoDatabase) CommitTrade(t *types.Trade) error { return nil } + +func (db *MongoDatabase) CommitItem(cacheKey string, val interface{}) error { + sc := db.Session.Copy() + defer sc.Close() + + r := &MongoItemRecord{ + Key: cacheKey, + Value: common.Bytes2Hex(val.([]byte)), + } + + query := bson.M{"key": cacheKey} + if _, err := sc.DB(db.dbName).C("items").Upsert(query, r); err != nil { + return err + } + log.Debug("Save", "cacheKey", cacheKey, "value", ToJSON(common.Bytes2Hex(val.([]byte)))) + return nil +} diff --git a/tomox/tomox.go b/tomox/tomox.go index a705f8dfb61c..9a5f328847a1 100644 --- a/tomox/tomox.go +++ b/tomox/tomox.go @@ -111,7 +111,7 @@ func NewLDBEngine(cfg *Config) *BatchDatabase { } func NewMongoDBEngine(cfg *Config) *MongoDatabase { - mongoDB, err := NewMongoDatabase(nil, cfg.ConnectionUrl) + mongoDB, err := NewMongoDatabase(nil, cfg.ConnectionUrl, 0) if err != nil { log.Error(err.Error()) @@ -139,8 +139,7 @@ func New(cfg *Config) *TomoX { tomoX.db = NewLDBEngine(cfg) tomoX.sdkNode = false case "mongodb": - // TODO: add dry-run mode for mongodb - //tomoX.db = NewMongoDBEngine(cfg) + tomoX.db = NewMongoDBEngine(cfg) tomoX.sdkNode = true default: log.Crit("wrong database engine, only accept either leveldb or mongodb")