Skip to content

Commit

Permalink
release v2.4.10: Modify the logic of transaction merging in change st…
Browse files Browse the repository at this point in the history
…ream mode to improve synchronization performance
  • Loading branch information
vinllen committed Aug 12, 2020
1 parent 928466e commit 5a1c74d
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 1 deletion.
5 changes: 5 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
2020-08-12 Alibaba Cloud.
* version: 2.4.10
* IMPROVE: modify the logic of transaction merging in change stream mode
to improve synchronization performance. #370

2020-08-06 Alibaba Cloud.
* version: 2.4.9
* BUGFIX: checkpoint doesn't work after restart. #403
Expand Down
10 changes: 9 additions & 1 deletion src/mongoshake/collector/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/vinllen/mgo/bson"
"mongoshake/common"
"time"
"reflect"
)


Expand Down Expand Up @@ -371,7 +372,8 @@ func (batcher *Batcher) BatchMore() ([][]*oplog.GenericOplog, bool, bool, bool)
}

// need merge transaction?
if genericLog.Parsed.Timestamp == batcher.previousOplog.Parsed.Timestamp {
// if genericLog.Parsed.Timestamp == batcher.previousOplog.Parsed.Timestamp {
if batcher.needMergeTransaction(genericLog.Parsed, batcher.previousOplog.Parsed) {
if len(batcher.transactionOplogs) == 0 && batcher.previousFlush == false {
// no transaction before, flush batchGroup
batcher.transactionOplogs = append(batcher.transactionOplogs, batcher.previousOplog.Parsed)
Expand Down Expand Up @@ -441,6 +443,12 @@ func (batcher *Batcher) gatherTransaction() *oplog.GenericOplog {
return gathered
}

func (bathcer *Batcher) needMergeTransaction(x, y *oplog.PartialLog) bool {
return x.Timestamp == y.Timestamp &&
x.Lsid != nil && len(x.Lsid.(bson.M)) >= 1 && reflect.DeepEqual(x.Lsid, y.Lsid) &&
x.TxnNumber == y.TxnNumber
}

// flush previous buffered oplog, true means should add barrier
func (batcher *Batcher) flushBufferOplogs() bool {
if batcher.previousOplog == fakeOplog {
Expand Down
157 changes: 157 additions & 0 deletions src/mongoshake/collector/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func mockOplogs(length int, ddlGiven []int, noopGiven []int, sameTsGiven []int,
Namespace: "a.b",
Operation: op,
Timestamp: bson.MongoTimestamp(startTs + int64(i)) << 32,
TxnNumber: 1,
Lsid: bson.M{
"id": "xx",
"uid": "xx2",
},
},
},
}
Expand Down Expand Up @@ -1278,6 +1283,158 @@ func TestGetTargetDelay(t *testing.T) {
}
}

func TestNeedMergeTransaction(t *testing.T) {
var nr int

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
},
}
assert.Equal(t, false, batcher.needMergeTransaction(x, y), "should be equal")
}

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
},
}
assert.Equal(t, false, batcher.needMergeTransaction(x, y), "should be equal")
}

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "xxx",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
},
}
assert.Equal(t, false, batcher.needMergeTransaction(x, y), "should be equal")
}

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
},
}
assert.Equal(t, true, batcher.needMergeTransaction(x, y), "should be equal")
}

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
TxnNumber: 1,
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
TxnNumber: 2,
},
}
assert.Equal(t, false, batcher.needMergeTransaction(x, y), "should be equal")
}

{
fmt.Printf("TestNeedMergeTransaction case %d.\n", nr)
nr++

batcher := &Batcher{}
x := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
TxnNumber: 2,
},
}
y := &oplog.PartialLog{
ParsedLog: oplog.ParsedLog{
Timestamp: 123,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
TxnNumber: 2,
},
}
assert.Equal(t, true, batcher.needMergeTransaction(x, y), "should be equal")
}
}

func TestGetBatchWithDelay(t *testing.T) {
// test getBatchWithDelay

Expand Down
2 changes: 2 additions & 0 deletions src/mongoshake/collector/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ func mockLog(ns string, ts bson.MongoTimestamp, withDefault bool) *oplog.ParsedL
Object: bson.D{},
Query: bson.M{},
UniqueIndexes: bson.M{},
Lsid: bson.M{},
}
case false:
return &oplog.ParsedLog{
Timestamp: ts,
Operation: "i",
Namespace: ns,
Object: bson.D{},
Lsid: bson.M{},
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions src/mongoshake/oplog/changestram_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error) {

// ts
oplog.Timestamp = event.ClusterTime
// transaction number
oplog.TxnNumber = event.TxnNumber
// lsid
oplog.Lsid = event.Lsid

ns := event.Ns

Expand Down
68 changes: 68 additions & 0 deletions src/mongoshake/oplog/changestram_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,4 +599,72 @@ func TestConvertEvent2Oplog(t *testing.T) {
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 0, len(list), "should be equal")
}

// test simple transaction
{
fmt.Printf("TestConvertEvent2Oplog case %d.\n", nr)
nr++

var err error
session, err = mgo.Dial(testUrl)
assert.Equal(t, nil, err, "should be equal")

err = session.DB("testDb").DropDatabase()
assert.Equal(t, nil, err, "should be equal")

// insert a:1
eventInsert1 := Event{
OperationType: "insert",
FullDocument: bson.D{
bson.DocElem{
Name: "_id",
Value: "1",
},
bson.DocElem{
Name: "a",
Value: "1",
},
},
Ns: bson.M{
"db": "testDb",
"coll": "testColl",
},
TxnNumber: 1,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
}
out, err := bson.Marshal(eventInsert1)
assert.Equal(t, nil, err, "should be equal")

err = runByte(out)
assert.Equal(t, nil, err, "should be equal")

all := getAllDoc("testDb", "testColl")
assert.Equal(t, 1, len(all), "should be equal")
assert.Equal(t, "1", all["1"]["a"], "should be equal")

// drop testDb
eventRename1 := Event{
OperationType: "dropDatabase",
Ns: bson.M{
"db": "testDb",
},
TxnNumber: 2,
Lsid: bson.M{
"id" : "70c47e76-7f48-46cb-ad07-cbeefd29d664",
"uid" : "Y5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fg=",
},
}
out, err = bson.Marshal(eventRename1)
assert.Equal(t, nil, err, "should be equal")

err = runByte(out)
assert.Equal(t, nil, err, "should be equal")

list, err := session.DB("testDb").CollectionNames()
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 0, len(list), "should be equal")
}
}
1 change: 1 addition & 0 deletions src/mongoshake/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ParsedLog struct {
UniqueIndexes bson.M `bson:"uk" json:"uk"`
Lsid interface{} `bson:"lsid" json:"lsid"` // mark the session id, used in transaction
FromMigrate bool `bson:"fromMigrate" json:"fromMigrate"` // move chunk
TxnNumber uint64 `bson:"txnNumber" json:"txnNumber"` // transaction number in session
}

type PartialLog struct {
Expand Down

0 comments on commit 5a1c74d

Please sign in to comment.