Skip to content

Commit

Permalink
Don't check checkIn err
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Oct 26, 2024
1 parent d80942e commit 02d5b7f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 208 deletions.
117 changes: 3 additions & 114 deletions testdata/client-side-operations-timeout/connection-churn.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 1
},
"data": {
"failCommands": [
Expand Down Expand Up @@ -202,7 +202,7 @@
"name": "insertOne",
"object": "collection",
"arguments": {
"timeoutMS": 50,
"timeoutMS": 500,
"document": {
"_id": 3,
"x": 1
Expand Down Expand Up @@ -337,117 +337,6 @@
}
]
},
{
"description": "write op with unsuccessful pending read",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"maxPoolSize": 1
},
"useMultipleMongoses": false,
"observeEvents": [
"commandFailedEvent",
"connectionClosedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "coll"
}
}
]
}
},
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
},
"data": {
"failCommands": [
"insert"
],
"blockConnection": true,
"blockTimeMS": 750
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"timeoutMS": 50,
"document": {
"_id": 3,
"x": 1
}
},
"expectError": {
"isTimeoutError": true
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"timeoutMS": 50,
"document": {
"_id": 3,
"x": 1
}
},
"expectError": {
"isTimeoutError": true
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandFailedEvent": {
"commandName": "insert"
}
}
]
},
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionClosedEvent": {
"reason": "error"
}
}
]
}
]
},
{
"description": "read op with successful pending read",
"operations": [
Expand Down Expand Up @@ -624,7 +513,7 @@
"name": "insertOne",
"object": "collection",
"arguments": {
"timeoutMS": 50,
"timeoutMS": 500,
"document": {
"_id": 3,
"x": 1
Expand Down
73 changes: 3 additions & 70 deletions testdata/client-side-operations-timeout/connection-churn.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ tests:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 1 }
data:
failCommands: ["insert"]
blockConnection: true
Expand All @@ -127,7 +127,7 @@ tests:
- name: insertOne
object: *collection
arguments:
timeoutMS: 50
timeoutMS: 500
document: { _id: 3, x: 1 }
expectError:
isTimeoutError: true
Expand Down Expand Up @@ -204,73 +204,6 @@ tests:
eventType: cmap
events: [] # Expect no connection closure.

- description: "write op with unsuccessful pending read"
operations:
- name: createEntities
object: testRunner
arguments:
entities:
- client:
id: &client client
uriOptions:
# For single-threaded drivers, ensure the operating connection
# is checked out to complete the read.
maxPoolSize: 1
useMultipleMongoses: false
observeEvents:
- commandFailedEvent
- connectionClosedEvent
- database:
id: &database database
client: *client
databaseName: *databaseName
- collection:
id: &collection collection
database: *database
collectionName: *collectionName

# Create a failpoint to block first op
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
data:
failCommands: ["insert"]
blockConnection: true
blockTimeMS: 750

# Execute op with timeout < block time
- name: insertOne
object: *collection
arguments:
timeoutMS: 50
document: { _id: 3, x: 1 }
expectError:
isTimeoutError: true

# The pending read should fail.
- name: insertOne
object: *collection
arguments:
timeoutMS: 50
document: { _id: 3, x: 1 }
expectError:
isTimeoutError: true

expectEvents:
- client: *client
events:
- commandFailedEvent:
commandName: insert
- client: *client
eventType: cmap
events:
- connectionClosedEvent:
reason: error

- description: "read op with successful pending read"
operations:
- name: createEntities
Expand Down Expand Up @@ -383,7 +316,7 @@ tests:
- name: insertOne
object: *collection
arguments:
timeoutMS: 50
timeoutMS: 500
document: { _id: 3, x: 1 }
expectError:
isTimeoutError: true
Expand Down
8 changes: 8 additions & 0 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type connection struct {
// read before returning the connection to the pool.
awaitRemainingBytes *int32
remainingTime *time.Duration
pendingReadMU sync.Mutex
}

// newConnection handles the creation of a connection. It does not connect the connection.
Expand All @@ -104,6 +105,7 @@ func newConnection(addr address.Address, opts ...ConnectionOption) *connection {
connectContextMade: make(chan struct{}),
cancellationListener: newContextDoneListener(),
connectListener: newNonBlockingContextDoneListener(),
pendingReadMU: sync.Mutex{},
}
// Connections to non-load balanced deployments should eagerly set the generation numbers so errors encountered
// at any point during connection establishment can be processed without the connection being considered stale.
Expand Down Expand Up @@ -409,11 +411,13 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {

dst, errMsg, err := c.read(ctx)
if err != nil {
c.pendingReadMU.Lock()
if c.awaitRemainingBytes == nil {
// If the connection was not marked as awaiting response, close the
// connection because we don't know what the connection state is.
c.close()
}
c.pendingReadMU.Unlock()
message := errMsg
if errors.Is(err, io.EOF) {
message = "socket was unexpectedly closed"
Expand Down Expand Up @@ -479,8 +483,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
n, err := io.ReadFull(c.nc, sizeBuf[:])
if err != nil {
if l := int32(n); l == 0 && isCSOTTimeout(err) {
c.pendingReadMU.Lock()
c.awaitRemainingBytes = &l
c.remainingTime = ptrutil.Ptr(BGReadTimeout)
c.pendingReadMU.Unlock()
}
return nil, "incomplete read of message header", err
}
Expand All @@ -496,8 +502,10 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
if err != nil {
remainingBytes := size - 4 - int32(n)
if remainingBytes > 0 && isCSOTTimeout(err) {
c.pendingReadMU.Lock()
c.awaitRemainingBytes = &remainingBytes
c.remainingTime = ptrutil.Ptr(BGReadTimeout)
c.pendingReadMU.Unlock()
}
return dst, "incomplete read of full message", err
}
Expand Down
Loading

0 comments on commit 02d5b7f

Please sign in to comment.