Skip to content

Commit

Permalink
Merge branch 'master' into GODRIVER-2726
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez authored Jan 16, 2024
2 parents 58c42a9 + df800a9 commit ea758f5
Show file tree
Hide file tree
Showing 23 changed files with 1,448 additions and 1,126 deletions.
5 changes: 2 additions & 3 deletions bson/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@
// 2. int8, int16, and int32 marshal to a BSON int32.
// 3. int marshals to a BSON int32 if the value is between math.MinInt32 and math.MaxInt32, inclusive, and a BSON int64
// otherwise.
// 4. int64 marshals to BSON int64.
// 4. int64 marshals to BSON int64 (unless [Encoder.IntMinSize] is set).
// 5. uint8 and uint16 marshal to a BSON int32.
// 6. uint, uint32, and uint64 marshal to a BSON int32 if the value is between math.MinInt32 and math.MaxInt32,
// inclusive, and BSON int64 otherwise.
// 6. uint, uint32, and uint64 marshal to a BSON int64 (unless [Encoder.IntMinSize] is set).
// 7. BSON null and undefined values will unmarshal into the zero value of a field (e.g. unmarshalling a BSON null or
// undefined value into a string will yield the empty string.).
//
Expand Down
30 changes: 30 additions & 0 deletions bson/encoder_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,33 @@ func ExampleEncoder_multipleExtendedJSONDocuments() {
// {"x":{"$numberInt":"3"},"y":{"$numberInt":"4"}}
// {"x":{"$numberInt":"4"},"y":{"$numberInt":"5"}}
}

func ExampleEncoder_IntMinSize() {
// Create an encoder that will marshal integers as the minimum BSON int size
// (either 32 or 64 bits) that can represent the integer value.
type foo struct {
Bar uint32
}

buf := new(bytes.Buffer)
vw, err := bsonrw.NewBSONValueWriter(buf)
if err != nil {
panic(err)
}

enc, err := bson.NewEncoder(vw)
if err != nil {
panic(err)
}

enc.IntMinSize()

err = enc.Encode(foo{2})
if err != nil {
panic(err)
}

fmt.Println(bson.Raw(buf.Bytes()).String())
// Output:
// {"bar": {"$numberInt":"2"}}
}
1 change: 0 additions & 1 deletion internal/cmd/compilecheck/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require go.mongodb.org/mongo-driver v1.11.7
require (
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions internal/cmd/compilecheck/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down
103 changes: 0 additions & 103 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,109 +609,6 @@ func TestClient(t *testing.T) {
assert.Equal(t, 0, closed, "expected no connections to be closed")
})

mt.Run("RTT90 is monitored", func(mt *mtest.T) {
mt.Parallel()

// Reset the client with a dialer that delays all network round trips by 300ms and set the
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples.
mt.ResetClient(options.Client().
SetDialer(newSlowConnDialer(slowConnDialerDelay)).
SetHeartbeatInterval(reducedHeartbeatInterval))

// Assert that RTT90s are eventually >300ms.
topo := getTopologyFromClient(mt.Client)
assert.Soon(mt, func(ctx context.Context) {
for {
// Stop loop if callback has been canceled.
select {
case <-ctx.Done():
return
default:
}

time.Sleep(100 * time.Millisecond)

// Wait for all of the server's RTT90s to be >300ms.
done := true
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.RTTMonitor().P90() <= 300*time.Millisecond {
done = false
}
}
if done {
return
}
}
}, 10*time.Second)
})

// Test that if Timeout is set and the RTT90 is greater than the remaining timeout for an operation, the
// operation is not sent to the server, fails with a timeout error, and no connections are closed.
mt.Run("RTT90 used to prevent sending requests", func(mt *mtest.T) {
mt.Parallel()

// Assert that we can call Ping with a 250ms timeout.
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
err := mt.Client.Ping(ctx, nil)
assert.Nil(mt, err, "Ping error: %v", err)

// Reset the client with a dialer that delays all network round trips by 300ms, set the
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples, and
// set a Timeout of 0 (infinite) on the Client to ensure that RTT90 is used as a sending
// threshold.
tpm := eventtest.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetDialer(newSlowConnDialer(slowConnDialerDelay)).
SetHeartbeatInterval(reducedHeartbeatInterval).
SetTimeout(0))

// Assert that RTT90s are eventually >275ms.
topo := getTopologyFromClient(mt.Client)
assert.Soon(mt, func(ctx context.Context) {
for {
// Stop loop if callback has been canceled.
select {
case <-ctx.Done():
return
default:
}

time.Sleep(100 * time.Millisecond)

// Wait for all of the server's RTT90s to be >275ms.
done := true
for _, desc := range topo.Description().Servers {
server, err := topo.FindServer(desc)
assert.Nil(mt, err, "FindServer error: %v", err)
if server.RTTMonitor().P90() <= 275*time.Millisecond {
done = false
}
}
if done {
return
}
}
}, 10*time.Second)

// Once we've waited for the RTT90 for the servers to be >275ms, run 10 Ping operations
// with a timeout of 275ms and expect that they return timeout errors.
for i := 0; i < 10; i++ {
ctx, cancel = context.WithTimeout(context.Background(), 275*time.Millisecond)
err := mt.Client.Ping(ctx, nil)
cancel()
assert.NotNil(mt, err, "expected Ping to return an error")
assert.True(mt, mongo.IsTimeout(err), "expected a timeout error, got: %v", err)
}

// Assert that the Ping timeouts result in no connections being closed.
closed := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionClosed }))
assert.Equal(t, 0, closed, "expected no connections to be closed")
})

// Test that OP_MSG is used for authentication-related commands on 3.6+ (WV 6+). Do not test when API version is
// set, as handshakes will always use OP_MSG.
opMsgOpts := mtest.NewOptions().ClientType(mtest.Proxy).MinServerVersion("3.6").Auth(true).RequireAPIVersion(false)
Expand Down
30 changes: 14 additions & 16 deletions internal/integration/mtest/sent_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type SentMessage struct {
// The documents sent for an insert, update, or delete command. This is separated into its own field because it's
// sent as part of the command document in OP_QUERY and as a document sequence outside the command document in
// OP_MSG.
DocumentSequence *bsoncore.DocumentSequence
Batch *bsoncore.Iterator
}

type sentMsgParseFn func([]byte) (*SentMessage, error)
Expand Down Expand Up @@ -87,26 +87,25 @@ func parseOpQuery(wm []byte) (*SentMessage, error) {

// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
// document. Pull these sequences out into an ArrayStyle DocumentSequence.
var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
cmdElems, _ := commandDoc.Elements()
for _, elem := range cmdElems {
switch elem.Key() {
case "documents", "updates", "deletes":
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.ArrayStyle,
Data: elem.Value().Array(),
batch = &bsoncore.Iterator{
List: elem.Value().Array(),
}
}
if docSequence != nil {
if batch != nil {
// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
break
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
rpDoc = rpVal.Document()
}

var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
if len(wm) != 0 {
// If there are bytes remaining in the wire message, they must correspond to a DocumentSequence section.
if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
Expand All @@ -169,16 +168,15 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
return nil, errors.New("failed to read document sequence")
}

docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.SequenceStyle,
Data: data,
batch = &bsoncore.Iterator{
List: data,
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down
18 changes: 18 additions & 0 deletions internal/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-c
}
}
}
return nil
case "wait":
waitMS, err := convertValueToMilliseconds(args.Lookup("ms"))
if err != nil {
return err
}

time.Sleep(waitMS)

return nil
case "runOnThread":
operationRaw, err := args.LookupErr("operation")
Expand Down Expand Up @@ -484,3 +493,12 @@ func verifyIndexExists(ctx context.Context, dbName, collName, indexName string,
}
return nil
}

func convertValueToMilliseconds(val bson.RawValue) (time.Duration, error) {
int32Val, ok := val.Int32OK()
if !ok {
return 0, fmt.Errorf("failed to convert value of type %s to int32", val.Type)
}

return time.Duration(int32Val) * time.Millisecond, nil
}
2 changes: 1 addition & 1 deletion mongo/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type batchCursor interface {

// Batch will return a DocumentSequence for the current batch of documents. The returned
// DocumentSequence is only valid until the next call to Next or Close.
Batch() *bsoncore.DocumentSequence
Batch() *bsoncore.Iterator

// Server returns a pointer to the cursor's server.
Server() driver.Server
Expand Down
Loading

0 comments on commit ea758f5

Please sign in to comment.