-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add signaling support for connection pool waiting #115
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
_harness | ||
|
||
.vscode |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,7 @@ type mongoServer struct { | |
abended bool | ||
minPoolSize int | ||
maxIdleTimeMS int | ||
poolWaiter *sync.Cond | ||
} | ||
|
||
type dialer struct { | ||
|
@@ -78,18 +79,19 @@ type mongoServerInfo struct { | |
|
||
var defaultServerInfo mongoServerInfo | ||
|
||
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { | ||
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { | ||
server := &mongoServer{ | ||
Addr: addr, | ||
ResolvedAddr: tcpaddr.String(), | ||
tcpaddr: tcpaddr, | ||
sync: sync, | ||
sync: syncChan, | ||
dial: dial, | ||
info: &defaultServerInfo, | ||
pingValue: time.Hour, // Push it back before an actual ping. | ||
minPoolSize: minPoolSize, | ||
maxIdleTimeMS: maxIdleTimeMS, | ||
} | ||
server.poolWaiter = sync.NewCond(server) | ||
go server.pinger(true) | ||
if maxIdleTimeMS != 0 { | ||
go server.poolShrinker() | ||
|
@@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m | |
} | ||
|
||
var errPoolLimit = errors.New("per-server connection limit reached") | ||
var errPoolTimeout = errors.New("could not acquire connection within pool timeout") | ||
var errServerClosed = errors.New("server was closed") | ||
|
||
// AcquireSocket returns a socket for communicating with the server. | ||
|
@@ -109,18 +112,80 @@ var errServerClosed = errors.New("server was closed") | |
// use in this server is greater than the provided limit, errPoolLimit is | ||
// returned. | ||
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { | ||
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond) | ||
} | ||
|
||
// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_ | ||
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout | ||
// should elapse before a socket is available, it will return errPoolTimeout. | ||
func (server *mongoServer) AcquireSocketWithBlocking( | ||
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration, | ||
) (socket *mongoSocket, abended bool, err error) { | ||
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout) | ||
} | ||
|
||
func (server *mongoServer) acquireSocketInternal( | ||
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration, | ||
) (socket *mongoSocket, abended bool, err error) { | ||
for { | ||
server.Lock() | ||
abended = server.abended | ||
if server.closed { | ||
server.Unlock() | ||
return nil, abended, errServerClosed | ||
} | ||
n := len(server.unusedSockets) | ||
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit { | ||
server.Unlock() | ||
return nil, false, errPoolLimit | ||
if poolLimit > 0 { | ||
if shouldBlock { | ||
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout | ||
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, | ||
// and fail if it is blown. | ||
// Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition | ||
// variable per waiter, which would involve loop traversal in the RecycleSocket | ||
// method. | ||
// We also can't use the approach of turning a condition variable into a channel outlined in | ||
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine. | ||
waitDone := make(chan struct{}) | ||
timeoutHit := false | ||
if poolTimeout > 0 { | ||
go func() { | ||
select { | ||
case <-waitDone: | ||
case <-time.After(poolTimeout): | ||
// timeoutHit is part of the wait condition, so needs to be changed under mutex. | ||
server.Lock() | ||
defer server.Unlock() | ||
timeoutHit = true | ||
server.poolWaiter.Broadcast() | ||
} | ||
}() | ||
} | ||
timeSpentWaiting := time.Duration(0) | ||
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { | ||
// We only count time spent in Wait(), and not time evaluating the entire loop, | ||
// so that in the happy non-blocking path where the condition above evaluates true | ||
// first time, we record a nice round zero wait time. | ||
waitStart := time.Now() | ||
// unlocks server mutex, waits, and locks again. Thus, the condition | ||
// above is evaluated only when the lock is held. | ||
server.poolWaiter.Wait() | ||
timeSpentWaiting += time.Since(waitStart) | ||
} | ||
close(waitDone) | ||
if timeoutHit { | ||
server.Unlock() | ||
stats.noticePoolTimeout(timeSpentWaiting) | ||
return nil, false, errPoolTimeout | ||
} | ||
// Record that we fetched a connection of of a socket list and how long we spent waiting | ||
stats.noticeSocketAcquisition(timeSpentWaiting) | ||
} else { | ||
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { | ||
server.Unlock() | ||
return nil, false, errPoolLimit | ||
} | ||
} | ||
} | ||
n := len(server.unusedSockets) | ||
if n > 0 { | ||
socket = server.unusedSockets[n-1] | ||
server.unusedSockets[n-1] = nil // Help GC. | ||
|
@@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) { | |
socket.lastTimeUsed = time.Now() | ||
server.unusedSockets = append(server.unusedSockets, socket) | ||
} | ||
// If anybody is waiting for a connection, they should try now. | ||
// Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket | ||
// and AcquireSocketWithBlocking allow the caller to specify the max number of connections, | ||
// rather than that being an intrinsic property of the connection pool (I assume to ensure | ||
// that there is always a connection available for replset topology discovery). Thus, once | ||
// a connection is returned to the pool, _every_ waiter needs to check if the connection count | ||
// is underneath their particular value for poolLimit. | ||
server.poolWaiter.Broadcast() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I am not 100% sure of my reading of the code ( mgo was never super fun to read, and I haven't had a closer look at it since some time ago, but you take a lock on the server when you try to acquire a socket, and block wait on a signal. The signal won't be raised, if you first need to take a lock on the same mutex, no ? Before, the pool was unlocked before returning an ErrPoolLimit, but it is not the case anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yyyyyyup......
I think my answer above covers this - the acquiring code implicitly releases the mutex in the call to
I think i'm calling
Is this not sufficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is. I'd like to blame the low caffeine level in my blood for raising this issue ;) |
||
server.Unlock() | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I might be missing something here, but,
the server lock is locked in line 124.
The waiter on line 158 wait for a signal, but the signal is sent only if a socket is returned to the pool.
The broadcast in line 151 won't be executed before an already used socket is released, as that code path tries to acquire lock that is is held by the parent goroutine (line 124) and releases on Wait() (line 158), no?
This is only after a cursory reading of the code. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock on 148 is in a separate goroutine - there is a new goroutine spawned here to wait for the timeout to expire and signal the waiters if so.
Perhaps this is the most confusing part - the invocation of
Wait
on 158 actually releases the server mutex - the CondVar'sL
member is initialised to the server mutex on line 90, and theWait
method is defined to unlockL
, wait for a signal, and then lockL
again before returning control to the caller.So, in the case where a socket is released in time -
AcquireSocket
locks the server mutexAcquireSocket
observes that there are no free connectionsAcquireSocket
unlocks the server mutex and waits for a signal (I believe this is atomic)RecycleSocket
locks the server mutexRecycleSocket
returns a connection to the free listRecycleSocket
broadcastsAcquireSocket
is woken from itsWait
, but is still blocked waiting for the server mutex, soWait
does not yet returnRecycleSocket
unlocks the server mutexAcquireSocket
locks the server mutex and returns control fromWait
AcquireSocket
observes that there are now free connections and continues as usual.In the case where a socket is not released on time -
AcquireSocket
locks the server mutexAcquireSocket
observes that there are no free connectionsAcquireSocket
unlocks the server mutex and waits for a signaltime.After
timeoutAcquireSocket
is woken from itsWait
, but is still blocked waiting for the server mutex, soWait
does not yet returnAcquireSocket
locks the server mutex and returns control fromWait
AcquireSocket
notices that the timeout flag has been setAcquireSocket
unlocks the mutex and errors outI mean, this is concurrent code with mutexs and semaphores, so I'm practically guaranteed to be wrong, but this is my understanding of how this works and hopefully this answers your question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I forgot that the Wait() releases the lock before yielding the goroutine.