Skip to content

Commit

Permalink
GODRIVER-3049 Cont. resolving breaks
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Dec 6, 2023
1 parent 5b711e8 commit 9ac559f
Show file tree
Hide file tree
Showing 9 changed files with 469 additions and 694 deletions.
2 changes: 1 addition & 1 deletion mongo/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type batchCursor interface {

// Batch will return a DocumentSequence for the current batch of documents. The returned
// DocumentSequence is only valid until the next call to Next or Close.
Batch() *bsoncore.DocumentSequence
Batch() *bsoncore.Iterator

// Server returns a pointer to the cursor's server.
Server() driver.Server
Expand Down
42 changes: 24 additions & 18 deletions mongo/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Cursor struct {
Current bson.Raw

bc batchCursor
batch *bsoncore.DocumentSequence
batch *bsoncore.Iterator
batchLength int
bsonOpts *options.BSONOptions
registry *bsoncodec.Registry
Expand Down Expand Up @@ -71,9 +71,10 @@ func newCursorWithSession(
c.closeImplicitSession()
}

// Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch
// will be pulled up by the first Next/TryNext call.
c.batchLength = c.bc.Batch().DocumentCount()
// Initialize just the batchLength here so RemainingBatchLength will return an
// accurate result. The actual batch will be pulled up by the first
// Next/TryNext call.
c.batchLength = c.bc.Batch().Count()
return c, nil
}

Expand All @@ -91,32 +92,37 @@ func NewCursorFromDocuments(documents []interface{}, err error, registry *bsonco
}

// Convert documents slice to a sequence-style byte array.
var docsBytes []byte
for _, doc := range documents {
values := make([]bsoncore.Value, len(documents))
for i, doc := range documents {
switch t := doc.(type) {
case nil:
return nil, ErrNilDocument
case []byte:
// Slight optimization so we'll just use MarshalBSON and not go through the codec machinery.
doc = bson.Raw(t)
}
var marshalErr error
docsBytes, marshalErr = bson.MarshalAppendWithRegistry(registry, docsBytes, doc)
if marshalErr != nil {
return nil, marshalErr
bytes, err := bson.MarshalWithRegistry(registry, doc)
if err != nil {
return nil, err
}

values[i] = bsoncore.Value{
Type: bson.TypeEmbeddedDocument,
Data: bytes,
}
}

c := &Cursor{
bc: driver.NewBatchCursorFromDocuments(docsBytes),
bc: driver.NewBatchCursorFromList(bsoncore.BuildArray(nil, values...)),
registry: registry,
err: err,
}

// Initialize batch and batchLength here. The underlying batch cursor will be preloaded with the
// provided contents, and thus already has a batch before calls to Next/TryNext.
c.batch = c.bc.Batch()
c.batchLength = c.bc.Batch().DocumentCount()
c.batchLength = c.bc.Batch().Count()

return c, nil
}

Expand Down Expand Up @@ -159,12 +165,12 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
if ctx == nil {
ctx = context.Background()
}
doc, err := c.batch.Next()
val, err := c.batch.Next()
switch {
case err == nil:
// Consume the next document in the current batch.
c.batchLength--
c.Current = bson.Raw(doc)
c.Current = bson.Raw(val.Data)
return true
case errors.Is(err, io.EOF): // Need to do a getMore
default:
Expand Down Expand Up @@ -202,12 +208,12 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {

// Use the new batch to update the batch and batchLength fields. Consume the first document in the batch.
c.batch = c.bc.Batch()
c.batchLength = c.batch.DocumentCount()
doc, err = c.batch.Next()
c.batchLength = c.batch.Count()
val, err = c.batch.Next()
switch {
case err == nil:
c.batchLength--
c.Current = bson.Raw(doc)
c.Current = bson.Raw(val.Data)
return true
case errors.Is(err, io.EOF): // Empty batch so we continue
default:
Expand Down Expand Up @@ -344,7 +350,7 @@ func (c *Cursor) RemainingBatchLength() int {

// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
// the next empty index in the slice, and an error if one occurs.
func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,
func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.Iterator,
index int) (reflect.Value, int, error) {

docs, err := batch.Documents()
Expand Down
6 changes: 3 additions & 3 deletions mongo/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
)

type testBatchCursor struct {
batches []*bsoncore.DocumentSequence
batch *bsoncore.DocumentSequence
batches []*bsoncore.Iterator
batch *bsoncore.Iterator
closed bool
}

func newTestBatchCursor(numBatches, batchSize int) *testBatchCursor {
batches := make([]*bsoncore.DocumentSequence, 0, numBatches)
batches := make([]*bsoncore.Iterator, 0, numBatches)

counter := 0
for batch := 0; batch < numBatches; batch++ {
Expand Down
30 changes: 14 additions & 16 deletions mongo/integration/mtest/sent_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type SentMessage struct {
// The documents sent for an insert, update, or delete command. This is separated into its own field because it's
// sent as part of the command document in OP_QUERY and as a document sequence outside the command document in
// OP_MSG.
DocumentSequence *bsoncore.DocumentSequence
Batch *bsoncore.Iterator
}

type sentMsgParseFn func([]byte) (*SentMessage, error)
Expand Down Expand Up @@ -87,26 +87,25 @@ func parseOpQuery(wm []byte) (*SentMessage, error) {

// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
// document. Pull these sequences out into an ArrayStyle DocumentSequence.
var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
cmdElems, _ := commandDoc.Elements()
for _, elem := range cmdElems {
switch elem.Key() {
case "documents", "updates", "deletes":
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.ArrayStyle,
Data: elem.Value().Array(),
batch = &bsoncore.Iterator{
List: elem.Value().Array(),
}
}
if docSequence != nil {
if batch != nil {
// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
break
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
rpDoc = rpVal.Document()
}

var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
if len(wm) != 0 {
// If there are bytes remaining in the wire message, they must correspond to a DocumentSequence section.
if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
Expand All @@ -169,16 +168,15 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
return nil, errors.New("failed to read document sequence")
}

docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.SequenceStyle,
Data: data,
batch = &bsoncore.Iterator{
List: data,
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down
189 changes: 0 additions & 189 deletions x/bsonx/bsoncore/document_sequence.go

This file was deleted.

Loading

0 comments on commit 9ac559f

Please sign in to comment.