Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add signaling support for connection pool waiting #115

Merged
merged 3 commits into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
_harness

.vscode
22 changes: 13 additions & 9 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
}

// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
warnedLimit := false
for {
cluster.RLock()
for {
Expand Down Expand Up @@ -662,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
continue
}

s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(100 * time.Millisecond)
continue
s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
}
if err != nil {
cluster.removeServer(server)
Expand Down
42 changes: 42 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) {
}
defer session.Close()

// So we can measure the stats for the blocking operation
mgo.ResetStats()

// Put one socket in use.
c.Assert(session.Ping(), IsNil)

Expand All @@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) {
session.Refresh()
delay := <-done
c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
stats := mgo.GetStats()
c.Assert(stats.TimesSocketAcquired, Equals, 2)
c.Assert(stats.TimesWaitedForPool, Equals, 1)
c.Assert(stats.PoolTimeouts, Equals, 0)
c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true)
}
}

Expand Down Expand Up @@ -1649,6 +1657,40 @@ func (s *S) TestPoolLimitMany(c *C) {
c.Assert(delay < 6e9, Equals, true)
}

func (s *S) TestPoolLimitTimeout(c *C) {
if *fast {
c.Skip("-fast")
}

session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session.Close()
session.SetPoolTimeout(1 * time.Second)
session.SetPoolLimit(1)

mgo.ResetStats()

// Put one socket in use.
c.Assert(session.Ping(), IsNil)

// Now block trying to get another one due to the pool limit.
copy := session.Copy()
defer copy.Close()
started := time.Now()
err = copy.Ping()
delay := time.Since(started)

c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err))
stats := mgo.GetStats()
c.Assert(stats.PoolTimeouts, Equals, 1)
c.Assert(stats.TimesSocketAcquired, Equals, 1)
c.Assert(stats.TimesWaitedForPool, Equals, 1)
c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true)
c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true)
}

func (s *S) TestSetModeEventualIterBug(c *C) {
session1, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
Expand Down
85 changes: 79 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type mongoServer struct {
abended bool
minPoolSize int
maxIdleTimeMS int
poolWaiter *sync.Cond
}

type dialer struct {
Expand All @@ -78,18 +79,19 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
sync: syncChan,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
server.poolWaiter = sync.NewCond(server)
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
Expand All @@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m
}

var errPoolLimit = errors.New("per-server connection limit reached")
var errPoolTimeout = errors.New("could not acquire connection within pool timeout")
var errServerClosed = errors.New("server was closed")

// AcquireSocket returns a socket for communicating with the server.
Expand All @@ -109,18 +112,80 @@ var errServerClosed = errors.New("server was closed")
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond)
}

// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
// should elapse before a socket is available, it will return errPoolTimeout.
func (server *mongoServer) AcquireSocketWithBlocking(
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout)
}

func (server *mongoServer) acquireSocketInternal(
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
if poolLimit > 0 {
if shouldBlock {
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
// and fail if it is blown.
// Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition
// variable per waiter, which would involve loop traversal in the RecycleSocket
// method.
// We also can't use the approach of turning a condition variable into a channel outlined in
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
waitDone := make(chan struct{})
timeoutHit := false
if poolTimeout > 0 {
go func() {
select {
case <-waitDone:
case <-time.After(poolTimeout):
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
server.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I might be missing something here, but,
the server lock is locked in line 124.

The waiter on line 158 wait for a signal, but the signal is sent only if a socket is returned to the pool.
The broadcast in line 151 won't be executed before an already used socket is released, as that code path tries to acquire lock that is is held by the parent goroutine (line 124) and releases on Wait() (line 158), no?

This is only after a cursory reading of the code. Please correct me if I am wrong.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the server lock is locked in line 124.

This lock on 148 is in a separate goroutine - there is a new goroutine spawned here to wait for the timeout to expire and signal the waiters if so.

The broadcast in line 151 won't be executed before an already used socket is released, as that code path tries to acquire lock that is is held by the parent goroutine

Perhaps this is the most confusing part - the invocation of Wait on 158 actually releases the server mutex - the CondVar's L member is initialised to the server mutex on line 90, and the Wait method is defined to unlock L, wait for a signal, and then lock L again before returning control to the caller.

So, in the case where a socket is released in time -

  • The call to AcquireSocket locks the server mutex
  • The call to AcquireSocket observes that there are no free connections
  • The call to AcquireSocket unlocks the server mutex and waits for a signal (I believe this is atomic)
  • The call to RecycleSocket locks the server mutex
  • The call to RecycleSocket returns a connection to the free list
  • The call to RecycleSocket broadcasts
  • The call to AcquireSocket is woken from its Wait, but is still blocked waiting for the server mutex, so Wait does not yet return
  • The call to RecycleSocket unlocks the server mutex
  • The call to AcquireSocket locks the server mutex and returns control from Wait
  • The call to AcquireSocket observes that there are now free connections and continues as usual.

In the case where a socket is not released on time -

  • The call to AcquireSocket locks the server mutex
  • The call to AcquireSocket observes that there are no free connections
  • The call to AcquireSocket unlocks the server mutex and waits for a signal
  • The timer goroutine hits the time.After timeout
  • The timer goroutine takes the server mutex
  • The timer goroutine sets the timeout flag
  • The timer goroutine broadcasts
  • The call to AcquireSocket is woken from its Wait, but is still blocked waiting for the server mutex, so Wait does not yet return
  • The timer goroutine unlocks the server mutex
  • The call to AcquireSocket locks the server mutex and returns control from Wait
  • The call to AcquireSocket notices that the timeout flag has been set
  • The call to AcquireSocket unlocks the mutex and errors out

I mean, this is concurrent code with mutexs and semaphores, so I'm practically guaranteed to be wrong, but this is my understanding of how this works and hopefully this answers your question.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I forgot that the Wait() releases the lock before yielding the goroutine.

defer server.Unlock()
timeoutHit = true
server.poolWaiter.Broadcast()
}
}()
}
timeSpentWaiting := time.Duration(0)
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
// We only count time spent in Wait(), and not time evaluating the entire loop,
// so that in the happy non-blocking path where the condition above evaluates true
// first time, we record a nice round zero wait time.
waitStart := time.Now()
// unlocks server mutex, waits, and locks again. Thus, the condition
// above is evaluated only when the lock is held.
server.poolWaiter.Wait()
timeSpentWaiting += time.Since(waitStart)
}
close(waitDone)
if timeoutHit {
server.Unlock()
stats.noticePoolTimeout(timeSpentWaiting)
return nil, false, errPoolTimeout
}
// Record that we fetched a connection of of a socket list and how long we spent waiting
stats.noticeSocketAcquisition(timeSpentWaiting)
} else {
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
}
}
}
n := len(server.unusedSockets)
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
Expand Down Expand Up @@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
// If anybody is waiting for a connection, they should try now.
// Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket
// and AcquireSocketWithBlocking allow the caller to specify the max number of connections,
// rather than that being an intrinsic property of the connection pool (I assume to ensure
// that there is always a connection available for replset topology discovery). Thus, once
// a connection is returned to the pool, _every_ waiter needs to check if the connection count
// is underneath their particular value for poolLimit.
server.poolWaiter.Broadcast()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I am not 100% sure of my reading of the code ( mgo was never super fun to read, and I haven't had a closer look at it since some time ago, but you take a lock on the server when you try to acquire a socket, and block wait on a signal. The signal won't be raised, if you first need to take a lock on the same mutex, no ?

Before, the pool was unlocked before returning an ErrPoolLimit, but it is not the case anymore.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mgo was never super fun to read

Yyyyyyup......

The signal won't be raised, if you first need to take a lock on the same mutex, no ?

I think my answer above covers this - the acquiring code implicitly releases the mutex in the call to Wait and re-acquires it when Wait returns. So the release code should be able to grab the mutex and send the signal.

Before, the pool was unlocked before returning an ErrPoolLimit, but it is not the case anymore.

I think i'm calling Unlock() on line 167:

} else {
	if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
		server.Unlock()
		return nil, false, errPoolLimit
	}
}

Is this not sufficient?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. I'd like to blame the low caffeine level in my blood for raising this issue ;)

server.Unlock()
}

Expand Down
25 changes: 24 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Session struct {
syncTimeout time.Duration
sockTimeout time.Duration
poolLimit int
poolTimeout time.Duration
consistency Mode
creds []Credential
dialCred *Credential
Expand Down Expand Up @@ -486,6 +487,11 @@ type DialInfo struct {
// See Session.SetPoolLimit for details.
PoolLimit int

// PoolTimeout defines max time to wait for a connection to become available
// if the pool limit is reaqched. Defaults to zero, which means forever.
// See Session.SetPoolTimeout for details
PoolTimeout time.Duration

// The identifier of the client application which ran the operation.
AppName string

Expand Down Expand Up @@ -596,6 +602,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
cluster.minPoolSize = info.MinPoolSize
cluster.maxIdleTimeMS = info.MaxIdleTimeMS

if info.PoolTimeout > 0 {
session.poolTimeout = info.PoolTimeout
}

cluster.Release()

// People get confused when we return a session that is not actually
Expand Down Expand Up @@ -711,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) {
syncTimeout: session.syncTimeout,
sockTimeout: session.sockTimeout,
poolLimit: session.poolLimit,
poolTimeout: session.poolTimeout,
consistency: session.consistency,
creds: creds,
dialCred: session.dialCred,
Expand Down Expand Up @@ -2051,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) {
s.m.Unlock()
}

// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse
// an existing connection from the pool if the PoolLimit has been reached. If
// the value is exceeded, the attempt to use a session will fail with an error.
// The default value is zero, which means to wait forever with no timeout.
func (s *Session) SetPoolTimeout(timeout time.Duration) {
s.m.Lock()
s.poolTimeout = timeout
s.m.Unlock()
}

// SetBypassValidation sets whether the server should bypass the registered
// validation expressions executed when documents are inserted or modified,
// in the interest of preserving invariants in the collection being modified.
Expand Down Expand Up @@ -4908,7 +4929,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
}

// Still not good. We need a new socket.
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
sock, err := s.cluster().AcquireSocketWithPoolTimeout(
s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout,
)
if err != nil {
return nil, err
}
Expand Down
45 changes: 36 additions & 9 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package mgo

import (
"sync"
"time"
)

var stats *Stats
Expand Down Expand Up @@ -77,15 +78,19 @@ func ResetStats() {
//
// TODO outdated fields ?
type Stats struct {
Clusters int
MasterConns int
SlaveConns int
SentOps int
ReceivedOps int
ReceivedDocs int
SocketsAlive int
SocketsInUse int
SocketRefs int
Clusters int
MasterConns int
SlaveConns int
SentOps int
ReceivedOps int
ReceivedDocs int
SocketsAlive int
SocketsInUse int
SocketRefs int
TimesSocketAcquired int
TimesWaitedForPool int
TotalPoolWaitTime time.Duration
PoolTimeouts int
}

func (stats *Stats) cluster(delta int) {
Expand Down Expand Up @@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) {
statsMutex.Unlock()
}
}

func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) {
if stats != nil {
statsMutex.Lock()
stats.TimesSocketAcquired++
stats.TotalPoolWaitTime += waitTime
if waitTime > 0 {
stats.TimesWaitedForPool++
}
statsMutex.Unlock()
}
}

func (stats *Stats) noticePoolTimeout(waitTime time.Duration) {
if stats != nil {
statsMutex.Lock()
stats.TimesWaitedForPool++
stats.PoolTimeouts++
stats.TotalPoolWaitTime += waitTime
statsMutex.Unlock()
}
}