From 2cd250c4dd67d45393c229fecde245e8ef45e669 Mon Sep 17 00:00:00 2001 From: vinllen Date: Sat, 24 Oct 2020 14:30:35 +0800 Subject: [PATCH] when "incr_sync.mongo_fetch_method == oplog", support upsert if target is sharding. #380 --- ChangeLog | 7 + scripts/run_ut_test.py | 18 +- src/mongoshake/executor/db_writer_bulk.go | 30 +++- src/mongoshake/executor/db_writer_single.go | 31 +++- src/mongoshake/executor/db_writer_test.go | 183 +++++++++++++++++++- src/mongoshake/oplog/changestram_event.go | 4 + src/mongoshake/oplog/oplog.go | 1 + src/mongoshake/unit_test_common/include.go | 5 +- 8 files changed, 248 insertions(+), 31 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3cab2a0b..756382a6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2020-10-xx Alibaba Cloud. + * version: 2.4.16 + * IMPROVE: ignore error when meets namespace not found error in delete + operation in single writer. #467 + * IMPROVE: when "incr_sync.mongo_fetch_method == oplog", support upsert + if target is sharding. #380 + 2020-10-14 Alibaba Cloud. * version: 2.4.15 * BUGFIX: in kafka tunnel, solve json marshal failed when meets NaN, Inf, diff --git a/scripts/run_ut_test.py b/scripts/run_ut_test.py index 2a63ca80..2f08dc65 100644 --- a/scripts/run_ut_test.py +++ b/scripts/run_ut_test.py @@ -12,26 +12,26 @@ run unit test in recursive """ def run_ut(cur_path): - print os.path.abspath(os.path.curdir), cur_path + print(os.path.abspath(os.path.curdir), cur_path) only_files = [f for f in listdir(".") if isfile(join(".", f))] only_dirs = [f for f in listdir(".") if isdir(join(".", f))] ut_files = [f for f in only_files if "_test.go" in f] - print only_files, only_dirs, ut_files + print(only_files, only_dirs, ut_files) if len(ut_files) != 0: # with ut file, need run ut test - print "----------- run ut test on dir[%s] -----------" % os.path.abspath(os.path.curdir) + print("----------- run ut test on dir[%s] -----------" % os.path.abspath(os.path.curdir)) ret = subprocess.call(["go", "test"]) # subprocess.check_output(["/bin/sh", "-c", "go", "test"]) - print "********************************** %s *************************************" % ("OK" if ret == 0 else "FAIL") + print("********************************** %s *************************************" % ("OK" if ret == 0 else "FAIL")) if ret != 0: - print "run failed" + print("run failed") exit(ret) for dir in only_dirs: - print "cd dir[%s]" % dir + print("cd dir[%s]" % dir) # dfs os.chdir(dir) @@ -44,10 +44,10 @@ def run_ut(cur_path): root_path = os.path.join("..", "src/mongoshake") os.chdir(root_path) go_path=os.path.abspath("../..") - print "GOPATH=%s" % go_path + print("GOPATH=%s" % go_path) #subprocess.call(["export GOPATH=%s" % go_path]) os.environ['GOPATH'] = go_path run_ut(".") - print "-----------------------------------" - print "all is well ^_^" + print("-----------------------------------") + print("all is well ^_^") diff --git a/src/mongoshake/executor/db_writer_bulk.go b/src/mongoshake/executor/db_writer_bulk.go index d33e2ea8..c5ead14a 100644 --- a/src/mongoshake/executor/db_writer_bulk.go +++ b/src/mongoshake/executor/db_writer_bulk.go @@ -58,14 +58,21 @@ func (bw *BulkWriter) doUpdateOnInsert(database, collection string, metadata bso oplogs []*OplogRecord, upsert bool) error { var update []interface{} for _, log := range oplogs { - // insert must have _id - // if id, exist := log.original.partialLog.Object["_id"]; exist { - if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil { - // newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef) - newObject := log.original.partialLog.Object - update = append(update, bson.M{"_id": id}, newObject) + // newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef) + newObject := log.original.partialLog.Object + if upsert && len(log.original.partialLog.DocumentKey) > 0 { + update = append(update, log.original.partialLog.DocumentKey, newObject) } else { - LOG.Warn("Insert on duplicated update _id look up failed. %v", log) + if upsert { + LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog) + } + // insert must have _id + // if id, exist := log.original.partialLog.Object["_id"]; exist { + if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil { + update = append(update, bson.M{"_id": id}, newObject) + } else { + LOG.Warn("Insert on duplicated update _id look up failed. %v", log) + } } LOG.Debug("writer: updateOnInsert %v", log.original.partialLog) @@ -115,7 +122,14 @@ func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.M, // delete(newObject, versionMark) //} log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark) - update = append(update, log.original.partialLog.Query, log.original.partialLog.Object) + if upsert && len(log.original.partialLog.DocumentKey) > 0 { + update = append(update, log.original.partialLog.DocumentKey, log.original.partialLog.Object) + } else { + if upsert { + LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog) + } + update = append(update, log.original.partialLog.Query, log.original.partialLog.Object) + } LOG.Debug("writer: update %v", log.original.partialLog.Object) } diff --git a/src/mongoshake/executor/db_writer_single.go b/src/mongoshake/executor/db_writer_single.go index 2f2eb29d..18758be6 100644 --- a/src/mongoshake/executor/db_writer_single.go +++ b/src/mongoshake/executor/db_writer_single.go @@ -64,14 +64,19 @@ func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata b } var updates []*pair for _, log := range oplogs { - // insert must have _id - // if id, exist := log.original.partialLog.Object["_id"]; exist { - if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil { - // newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef) - newObject := log.original.partialLog.Object - updates = append(updates, &pair{id: id, data: newObject}) + newObject := log.original.partialLog.Object + if upsert && len(log.original.partialLog.DocumentKey) > 0 { + updates = append(updates, &pair{id: log.original.partialLog.DocumentKey, data: newObject}) } else { - return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog) + if upsert { + LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog) + } + // insert must have _id + if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil { + updates = append(updates, &pair{id: id, data: newObject}) + } else { + return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog) + } } LOG.Debug("writer: updateOnInsert %v", log.original.partialLog) @@ -80,7 +85,7 @@ func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata b collectionHandle := sw.session.DB(database).C(collection) if upsert { for i, update := range updates { - if _, err := collectionHandle.UpsertId(update.id, update.data); err != nil { + if _, err := collectionHandle.Upsert(update.id, update.data); err != nil { // error can be ignored if IgnoreError(err, "u", utils.TimestampToInt64(oplogs[i].original.partialLog.Timestamp) <= sw.fullFinishTs) { continue @@ -118,7 +123,15 @@ func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.M, // delete(newObject, versionMark) //} log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark) - _, err := collectionHandle.Upsert(log.original.partialLog.Query, log.original.partialLog.Object) + var err error + if upsert && len(log.original.partialLog.DocumentKey) > 0 { + _, err = collectionHandle.Upsert(log.original.partialLog.DocumentKey, log.original.partialLog.Object) + } else { + if upsert { + LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog) + } + _, err = collectionHandle.Upsert(log.original.partialLog.Query, log.original.partialLog.Object) + } if err != nil { // error can be ignored if IgnoreError(err, "u", utils.TimestampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) { diff --git a/src/mongoshake/executor/db_writer_test.go b/src/mongoshake/executor/db_writer_test.go index 1ba3886d..23399e4e 100644 --- a/src/mongoshake/executor/db_writer_test.go +++ b/src/mongoshake/executor/db_writer_test.go @@ -3,10 +3,12 @@ package executor import ( "testing" "fmt" + "strings" "mongoshake/common" "mongoshake/oplog" "mongoshake/unit_test_common" + "mongoshake/collector/configure" "github.com/stretchr/testify/assert" "github.com/vinllen/mgo/bson" @@ -14,9 +16,10 @@ import ( ) const ( - testMongoAddress = unit_test_common.TestUrl - testDb = "writer_test" - testCollection = "a" + testMongoAddress = unit_test_common.TestUrl + testMongoShardingAddress = unit_test_common.TestUrlSharding + testDb = "writer_test" + testCollection = "a" ) func mockOplogRecord(oId, oX int, o2Id int) *OplogRecord { @@ -262,6 +265,93 @@ func TestSingleWriter(t *testing.T) { assert.Equal(t, 20, result[1].(bson.M)["x"], "should be equal") assert.Equal(t, 30, result[2].(bson.M)["x"], "should be equal") } + + { + fmt.Printf("TestSingleWriter case %d.\n", nr) + nr++ + + conf.Options.IncrSyncExecutorUpsert = true + + conn, err := utils.NewMongoConn(testMongoShardingAddress, "primary", true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault) + assert.Equal(t, nil, err, "should be equal") + + writer := NewDbWriter(conn.Session, bson.M{}, false, 0) + + // drop database + err = conn.Session.DB(testDb).DropDatabase() + + // enable sharding + err = conn.Session.DB("admin").Run(bson.D{{"enablesharding", testDb}}, nil) + assert.Equal(t, nil, err, "should be equal") + + // shard collection + ns := fmt.Sprintf("%s.%s", testDb, testCollection) + err = conn.Session.DB("admin").Run(bson.D{ + {"shardCollection", ns}, + {"key", bson.M{"x": 1}}, + {"unique", true}, + }, nil) + assert.Equal(t, nil, err, "should be equal") + + // 1-2 + inserts := []*OplogRecord{ + mockOplogRecord(1, 1, -1), + mockOplogRecord(2, 2, -1), + } + + err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true) + assert.NotEqual(t, nil, err, "should be equal") + assert.Equal(t, true, strings.Contains(err.Error(), "Failed to target upsert by query"), "should be equal") + fmt.Println(err) + + inserts[0].original.partialLog.DocumentKey = bson.M{ + "_id": 1, + "x": 1, + } + inserts[1].original.partialLog.DocumentKey = bson.M{ + "_id": 2, + "x": 2, + } + err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true) + assert.Equal(t, nil, err, "should be equal") + + // query + result := make([]interface{}, 0) + err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result) + assert.Equal(t, nil, err, "should be equal") + assert.Equal(t, 2, len(result), "should be equal") + assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal") + assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal") + + fmt.Println("---------------") + // 2-3 + inserts2 := []*OplogRecord{ + mockOplogRecord(2, 20, -1), + mockOplogRecord(3, 3, -1), + } + inserts2[0].original.partialLog.DocumentKey = bson.M{ + "_id": 2, + "x": 2, + } + inserts2[1].original.partialLog.DocumentKey = bson.M{ + "_id": 3, + "x": 3, + } + + // see https://github.com/alibaba/MongoShake/issues/380 + err = writer.doInsert(testDb, testCollection, bson.M{}, inserts2, true) + assert.NotEqual(t, nil, err, "should be equal") + assert.Equal(t, true, strings.Contains(err.Error(), "Must run update to shard key"), "should be equal") + + // query + result = make([]interface{}, 0) + err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result) + assert.Equal(t, nil, err, "should be equal") + assert.Equal(t, 3, len(result), "should be equal") + assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal") + assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal") + assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal") + } } func TestBulkWriter(t *testing.T) { @@ -589,6 +679,93 @@ func TestBulkWriter(t *testing.T) { assert.Equal(t, 20, result[1].(bson.M)["x"], "should be equal") assert.Equal(t, 30, result[2].(bson.M)["x"], "should be equal") } + + { + fmt.Printf("TestBulkWriter case %d.\n", nr) + nr++ + + conf.Options.IncrSyncExecutorUpsert = true + + conn, err := utils.NewMongoConn(testMongoShardingAddress, "primary", true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault) + assert.Equal(t, nil, err, "should be equal") + + writer := NewDbWriter(conn.Session, bson.M{}, true, 0) + + // drop database + err = conn.Session.DB(testDb).DropDatabase() + + // enable sharding + err = conn.Session.DB("admin").Run(bson.D{{"enablesharding", testDb}}, nil) + assert.Equal(t, nil, err, "should be equal") + + // shard collection + ns := fmt.Sprintf("%s.%s", testDb, testCollection) + err = conn.Session.DB("admin").Run(bson.D{ + {"shardCollection", ns}, + {"key", bson.M{"x": 1}}, + {"unique", true}, + }, nil) + assert.Equal(t, nil, err, "should be equal") + + // 1-2 + inserts := []*OplogRecord{ + mockOplogRecord(1, 1, -1), + mockOplogRecord(2, 2, -1), + } + + err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true) + assert.NotEqual(t, nil, err, "should be equal") + assert.Equal(t, true, strings.Contains(err.Error(), "Failed to target upsert by query"), "should be equal") + fmt.Println(err) + + inserts[0].original.partialLog.DocumentKey = bson.M{ + "_id": 1, + "x": 1, + } + inserts[1].original.partialLog.DocumentKey = bson.M{ + "_id": 2, + "x": 2, + } + err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true) + assert.Equal(t, nil, err, "should be equal") + + // query + result := make([]interface{}, 0) + err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result) + assert.Equal(t, nil, err, "should be equal") + assert.Equal(t, 2, len(result), "should be equal") + assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal") + assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal") + + fmt.Println("---------------") + // 2-3 + inserts2 := []*OplogRecord{ + mockOplogRecord(2, 20, -1), + mockOplogRecord(3, 3, -1), + } + inserts2[0].original.partialLog.DocumentKey = bson.M{ + "_id": 2, + "x": 2, + } + inserts2[1].original.partialLog.DocumentKey = bson.M{ + "_id": 3, + "x": 3, + } + + // see https://github.com/alibaba/MongoShake/issues/380 + err = writer.doInsert(testDb, testCollection, bson.M{}, inserts2, true) + assert.NotEqual(t, nil, err, "should be equal") + assert.Equal(t, true, strings.Contains(err.Error(), "Must run update to shard key"), "should be equal") + + // query + result = make([]interface{}, 0) + err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result) + assert.Equal(t, nil, err, "should be equal") + assert.Equal(t, 3, len(result), "should be equal") + assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal") + assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal") + assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal") + } } func TestRunCommand(t *testing.T) { diff --git a/src/mongoshake/oplog/changestram_event.go b/src/mongoshake/oplog/changestram_event.go index 031d349d..5677e955 100644 --- a/src/mongoshake/oplog/changestram_event.go +++ b/src/mongoshake/oplog/changestram_event.go @@ -87,6 +87,10 @@ func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error) { oplog.TxnNumber = event.TxnNumber // lsid oplog.Lsid = event.Lsid + // documentKey + if len(event.DocumentKey) > 0 { + oplog.DocumentKey = event.DocumentKey + } ns := event.Ns diff --git a/src/mongoshake/oplog/oplog.go b/src/mongoshake/oplog/oplog.go index 09de5f2a..84575f78 100644 --- a/src/mongoshake/oplog/oplog.go +++ b/src/mongoshake/oplog/oplog.go @@ -31,6 +31,7 @@ type ParsedLog struct { Lsid bson.M `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk TxnNumber uint64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session + DocumentKey bson.M `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id // Ui bson.Binary `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently } diff --git a/src/mongoshake/unit_test_common/include.go b/src/mongoshake/unit_test_common/include.go index 789bf072..db5d865c 100644 --- a/src/mongoshake/unit_test_common/include.go +++ b/src/mongoshake/unit_test_common/include.go @@ -1,5 +1,6 @@ package unit_test_common const ( - TestUrl = "mongodb://100.81.164.186:31881,100.81.164.186:31882,100.81.164.186:31883" -) \ No newline at end of file + TestUrl = "mongodb://100.81.164.186:31881,100.81.164.186:31882,100.81.164.186:31883" + TestUrlSharding = "mongodb://100.81.164.181:33010" +)