Skip to content

Commit

Permalink
when "incr_sync.mongo_fetch_method == oplog", support upsert if targe…
Browse files Browse the repository at this point in the history
…t is sharding. #380
  • Loading branch information
vinllen committed Oct 24, 2020
1 parent 549cfbe commit 2cd250c
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 31 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
2020-10-xx Alibaba Cloud.
* version: 2.4.16
* IMPROVE: ignore error when meets namespace not found error in delete
operation in single writer. #467
* IMPROVE: when "incr_sync.mongo_fetch_method == oplog", support upsert
if target is sharding. #380

2020-10-14 Alibaba Cloud.
* version: 2.4.15
* BUGFIX: in kafka tunnel, solve json marshal failed when meets NaN, Inf,
Expand Down
18 changes: 9 additions & 9 deletions scripts/run_ut_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
run unit test in recursive
"""
def run_ut(cur_path):
print os.path.abspath(os.path.curdir), cur_path
print(os.path.abspath(os.path.curdir), cur_path)

only_files = [f for f in listdir(".") if isfile(join(".", f))]
only_dirs = [f for f in listdir(".") if isdir(join(".", f))]
ut_files = [f for f in only_files if "_test.go" in f]
print only_files, only_dirs, ut_files
print(only_files, only_dirs, ut_files)

if len(ut_files) != 0:
# with ut file, need run ut test
print "----------- run ut test on dir[%s] -----------" % os.path.abspath(os.path.curdir)
print("----------- run ut test on dir[%s] -----------" % os.path.abspath(os.path.curdir))
ret = subprocess.call(["go", "test"])
# subprocess.check_output(["/bin/sh", "-c", "go", "test"])

print "********************************** %s *************************************" % ("OK" if ret == 0 else "FAIL")
print("********************************** %s *************************************" % ("OK" if ret == 0 else "FAIL"))
if ret != 0:
print "run failed"
print("run failed")
exit(ret)

for dir in only_dirs:
print "cd dir[%s]" % dir
print("cd dir[%s]" % dir)

# dfs
os.chdir(dir)
Expand All @@ -44,10 +44,10 @@ def run_ut(cur_path):
root_path = os.path.join("..", "src/mongoshake")
os.chdir(root_path)
go_path=os.path.abspath("../..")
print "GOPATH=%s" % go_path
print("GOPATH=%s" % go_path)
#subprocess.call(["export GOPATH=%s" % go_path])
os.environ['GOPATH'] = go_path
run_ut(".")

print "-----------------------------------"
print "all is well ^_^"
print("-----------------------------------")
print("all is well ^_^")
30 changes: 22 additions & 8 deletions src/mongoshake/executor/db_writer_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,21 @@ func (bw *BulkWriter) doUpdateOnInsert(database, collection string, metadata bso
oplogs []*OplogRecord, upsert bool) error {
var update []interface{}
for _, log := range oplogs {
// insert must have _id
// if id, exist := log.original.partialLog.Object["_id"]; exist {
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
// newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef)
newObject := log.original.partialLog.Object
update = append(update, bson.M{"_id": id}, newObject)
// newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef)
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
update = append(update, log.original.partialLog.DocumentKey, newObject)
} else {
LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
// if id, exist := log.original.partialLog.Object["_id"]; exist {
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
update = append(update, bson.M{"_id": id}, newObject)
} else {
LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
}
}

LOG.Debug("writer: updateOnInsert %v", log.original.partialLog)
Expand Down Expand Up @@ -115,7 +122,14 @@ func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.M,
// delete(newObject, versionMark)
//}
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
update = append(update, log.original.partialLog.Query, log.original.partialLog.Object)
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
update = append(update, log.original.partialLog.DocumentKey, log.original.partialLog.Object)
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
update = append(update, log.original.partialLog.Query, log.original.partialLog.Object)
}

LOG.Debug("writer: update %v", log.original.partialLog.Object)
}
Expand Down
31 changes: 22 additions & 9 deletions src/mongoshake/executor/db_writer_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,19 @@ func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata b
}
var updates []*pair
for _, log := range oplogs {
// insert must have _id
// if id, exist := log.original.partialLog.Object["_id"]; exist {
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
// newObject := utils.AdjustDBRef(log.original.partialLog.Object, conf.Options.DBRef)
newObject := log.original.partialLog.Object
updates = append(updates, &pair{id: id, data: newObject})
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
updates = append(updates, &pair{id: log.original.partialLog.DocumentKey, data: newObject})
} else {
return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog)
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
updates = append(updates, &pair{id: id, data: newObject})
} else {
return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog)
}
}

LOG.Debug("writer: updateOnInsert %v", log.original.partialLog)
Expand All @@ -80,7 +85,7 @@ func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata b
collectionHandle := sw.session.DB(database).C(collection)
if upsert {
for i, update := range updates {
if _, err := collectionHandle.UpsertId(update.id, update.data); err != nil {
if _, err := collectionHandle.Upsert(update.id, update.data); err != nil {
// error can be ignored
if IgnoreError(err, "u", utils.TimestampToInt64(oplogs[i].original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
Expand Down Expand Up @@ -118,7 +123,15 @@ func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.M,
// delete(newObject, versionMark)
//}
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
_, err := collectionHandle.Upsert(log.original.partialLog.Query, log.original.partialLog.Object)
var err error
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
_, err = collectionHandle.Upsert(log.original.partialLog.DocumentKey, log.original.partialLog.Object)
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
_, err = collectionHandle.Upsert(log.original.partialLog.Query, log.original.partialLog.Object)
}
if err != nil {
// error can be ignored
if IgnoreError(err, "u", utils.TimestampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) {
Expand Down
183 changes: 180 additions & 3 deletions src/mongoshake/executor/db_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ package executor
import (
"testing"
"fmt"
"strings"

"mongoshake/common"
"mongoshake/oplog"
"mongoshake/unit_test_common"
"mongoshake/collector/configure"

"github.com/stretchr/testify/assert"
"github.com/vinllen/mgo/bson"
"github.com/vinllen/mgo"
)

const (
testMongoAddress = unit_test_common.TestUrl
testDb = "writer_test"
testCollection = "a"
testMongoAddress = unit_test_common.TestUrl
testMongoShardingAddress = unit_test_common.TestUrlSharding
testDb = "writer_test"
testCollection = "a"
)

func mockOplogRecord(oId, oX int, o2Id int) *OplogRecord {
Expand Down Expand Up @@ -262,6 +265,93 @@ func TestSingleWriter(t *testing.T) {
assert.Equal(t, 20, result[1].(bson.M)["x"], "should be equal")
assert.Equal(t, 30, result[2].(bson.M)["x"], "should be equal")
}

{
fmt.Printf("TestSingleWriter case %d.\n", nr)
nr++

conf.Options.IncrSyncExecutorUpsert = true

conn, err := utils.NewMongoConn(testMongoShardingAddress, "primary", true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

writer := NewDbWriter(conn.Session, bson.M{}, false, 0)

// drop database
err = conn.Session.DB(testDb).DropDatabase()

// enable sharding
err = conn.Session.DB("admin").Run(bson.D{{"enablesharding", testDb}}, nil)
assert.Equal(t, nil, err, "should be equal")

// shard collection
ns := fmt.Sprintf("%s.%s", testDb, testCollection)
err = conn.Session.DB("admin").Run(bson.D{
{"shardCollection", ns},
{"key", bson.M{"x": 1}},
{"unique", true},
}, nil)
assert.Equal(t, nil, err, "should be equal")

// 1-2
inserts := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
}

err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true)
assert.NotEqual(t, nil, err, "should be equal")
assert.Equal(t, true, strings.Contains(err.Error(), "Failed to target upsert by query"), "should be equal")
fmt.Println(err)

inserts[0].original.partialLog.DocumentKey = bson.M{
"_id": 1,
"x": 1,
}
inserts[1].original.partialLog.DocumentKey = bson.M{
"_id": 2,
"x": 2,
}
err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true)
assert.Equal(t, nil, err, "should be equal")

// query
result := make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 2, len(result), "should be equal")
assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal")
assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal")

fmt.Println("---------------")
// 2-3
inserts2 := []*OplogRecord{
mockOplogRecord(2, 20, -1),
mockOplogRecord(3, 3, -1),
}
inserts2[0].original.partialLog.DocumentKey = bson.M{
"_id": 2,
"x": 2,
}
inserts2[1].original.partialLog.DocumentKey = bson.M{
"_id": 3,
"x": 3,
}

// see https://github.com/alibaba/MongoShake/issues/380
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts2, true)
assert.NotEqual(t, nil, err, "should be equal")
assert.Equal(t, true, strings.Contains(err.Error(), "Must run update to shard key"), "should be equal")

// query
result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(result), "should be equal")
assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal")
assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal")
assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal")
}
}

func TestBulkWriter(t *testing.T) {
Expand Down Expand Up @@ -589,6 +679,93 @@ func TestBulkWriter(t *testing.T) {
assert.Equal(t, 20, result[1].(bson.M)["x"], "should be equal")
assert.Equal(t, 30, result[2].(bson.M)["x"], "should be equal")
}

{
fmt.Printf("TestBulkWriter case %d.\n", nr)
nr++

conf.Options.IncrSyncExecutorUpsert = true

conn, err := utils.NewMongoConn(testMongoShardingAddress, "primary", true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

writer := NewDbWriter(conn.Session, bson.M{}, true, 0)

// drop database
err = conn.Session.DB(testDb).DropDatabase()

// enable sharding
err = conn.Session.DB("admin").Run(bson.D{{"enablesharding", testDb}}, nil)
assert.Equal(t, nil, err, "should be equal")

// shard collection
ns := fmt.Sprintf("%s.%s", testDb, testCollection)
err = conn.Session.DB("admin").Run(bson.D{
{"shardCollection", ns},
{"key", bson.M{"x": 1}},
{"unique", true},
}, nil)
assert.Equal(t, nil, err, "should be equal")

// 1-2
inserts := []*OplogRecord{
mockOplogRecord(1, 1, -1),
mockOplogRecord(2, 2, -1),
}

err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true)
assert.NotEqual(t, nil, err, "should be equal")
assert.Equal(t, true, strings.Contains(err.Error(), "Failed to target upsert by query"), "should be equal")
fmt.Println(err)

inserts[0].original.partialLog.DocumentKey = bson.M{
"_id": 1,
"x": 1,
}
inserts[1].original.partialLog.DocumentKey = bson.M{
"_id": 2,
"x": 2,
}
err = writer.doUpdate(testDb, testCollection, bson.M{}, inserts, true)
assert.Equal(t, nil, err, "should be equal")

// query
result := make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 2, len(result), "should be equal")
assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal")
assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal")

fmt.Println("---------------")
// 2-3
inserts2 := []*OplogRecord{
mockOplogRecord(2, 20, -1),
mockOplogRecord(3, 3, -1),
}
inserts2[0].original.partialLog.DocumentKey = bson.M{
"_id": 2,
"x": 2,
}
inserts2[1].original.partialLog.DocumentKey = bson.M{
"_id": 3,
"x": 3,
}

// see https://github.com/alibaba/MongoShake/issues/380
err = writer.doInsert(testDb, testCollection, bson.M{}, inserts2, true)
assert.NotEqual(t, nil, err, "should be equal")
assert.Equal(t, true, strings.Contains(err.Error(), "Must run update to shard key"), "should be equal")

// query
result = make([]interface{}, 0)
err = conn.Session.DB(testDb).C(testCollection).Find(bson.M{}).Sort("_id").All(&result)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(result), "should be equal")
assert.Equal(t, 1, result[0].(bson.M)["x"], "should be equal")
assert.Equal(t, 2, result[1].(bson.M)["x"], "should be equal")
assert.Equal(t, 3, result[2].(bson.M)["x"], "should be equal")
}
}

func TestRunCommand(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions src/mongoshake/oplog/changestram_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error) {
oplog.TxnNumber = event.TxnNumber
// lsid
oplog.Lsid = event.Lsid
// documentKey
if len(event.DocumentKey) > 0 {
oplog.DocumentKey = event.DocumentKey
}

ns := event.Ns

Expand Down
1 change: 1 addition & 0 deletions src/mongoshake/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ParsedLog struct {
Lsid bson.M `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction
FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk
TxnNumber uint64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session
DocumentKey bson.M `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id
// Ui bson.Binary `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently
}

Expand Down
5 changes: 3 additions & 2 deletions src/mongoshake/unit_test_common/include.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package unit_test_common

const (
TestUrl = "mongodb://100.81.164.186:31881,100.81.164.186:31882,100.81.164.186:31883"
)
TestUrl = "mongodb://100.81.164.186:31881,100.81.164.186:31882,100.81.164.186:31883"
TestUrlSharding = "mongodb://100.81.164.181:33010"
)

0 comments on commit 2cd250c

Please sign in to comment.