Skip to content

Commit

Permalink
Merge branch 'development' of github.com:globalsign/mgo into connecti…
Browse files Browse the repository at this point in the history
…on_pool_waiter
  • Loading branch information
KJTsanaktsidis committed Mar 4, 2018
2 parents 2c43232 + 860240e commit 2189357
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 24 deletions.
36 changes: 20 additions & 16 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,23 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
Expand Down Expand Up @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() {
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
minPoolSize := cluster.minPoolSize
maxIdleTimeMS := cluster.maxIdleTimeMS
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down
71 changes: 63 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type mongoServer struct {
closed bool
abended bool
poolWaiter *sync.Cond
minPoolSize int
maxIdleTimeMS int
}

type dialer struct {
Expand All @@ -77,18 +79,23 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: syncChan,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: syncChan,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
server.poolWaiter = sync.NewCond(server)
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
}
return server
}

Expand Down Expand Up @@ -277,6 +284,7 @@ func (server *mongoServer) close(waitForIdle bool) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
server.Lock()
if !server.closed {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
// If anybody is waiting for a connection, they should try now.
Expand Down Expand Up @@ -410,6 +418,53 @@ func (server *mongoServer) pinger(loop bool) {
}
}

func (server *mongoServer) poolShrinker() {
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
now := time.Now()
end := 0
reclaimMap := map[*mongoSocket]struct{}{}
// Because the acquisition and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
break
}
end++
reclaimMap[s] = struct{}{}
}
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*mongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
remainSockets := []*mongoSocket{}
for _, s := range server.liveSockets {
if _, ok := reclaimMap[s]; !ok {
remainSockets = append(remainSockets, s)
}
}
server.liveSockets = remainSockets
stats.conn(-1*end, server.info.Master)
}
server.Unlock()

for _, s := range tbr {
s.Close()
}
}
}

type mongoServerSlice []*mongoServer

func (s mongoServerSlice) Len() int {
Expand Down
40 changes: 40 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ const (
// Defines the per-server socket pool limit. Defaults to 4096.
// See Session.SetPoolLimit for details.
//
// minPoolSize=<limit>
//
// Defines the per-server socket pool minium size. Defaults to 0.
//
// maxIdleTimeMS=<millisecond>
//
// The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed. If maxIdleTimeMS is 0, connections will never be
// closed due to inactivity.
//
// appName=<appName>
//
// The identifier of this client application. This parameter is used to
Expand Down Expand Up @@ -323,6 +333,8 @@ func ParseURL(url string) (*DialInfo, error) {
appName := ""
readPreferenceMode := Primary
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
for _, opt := range uinfo.options {
switch opt.key {
case "authSource":
Expand Down Expand Up @@ -369,6 +381,22 @@ func ParseURL(url string) (*DialInfo, error) {
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
}
readPreferenceTagSets = append(readPreferenceTagSets, doc)
case "minPoolSize":
minPoolSize, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for minPoolSize: " + opt.value)
}
if minPoolSize < 0 {
return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value)
}
case "maxIdleTimeMS":
maxIdleTimeMS, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
}
if maxIdleTimeMS < 0 {
return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value)
}
case "connect":
if opt.value == "direct" {
direct = true
Expand Down Expand Up @@ -403,6 +431,8 @@ func ParseURL(url string) (*DialInfo, error) {
TagSets: readPreferenceTagSets,
},
ReplicaSetName: setName,
MinPoolSize: minPoolSize,
MaxIdleTimeMS: maxIdleTimeMS,
}
return &info, nil
}
Expand Down Expand Up @@ -481,6 +511,14 @@ type DialInfo struct {
// cluster and establish connections with further servers too.
Direct bool

// MinPoolSize defines The minimum number of connections in the connection pool.
// Defaults to 0.
MinPoolSize int

//The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed.
MaxIdleTimeMS int

// DialServer optionally specifies the dial function for establishing
// connections with the MongoDB servers.
DialServer func(addr *ServerAddr) (net.Conn, error)
Expand Down Expand Up @@ -563,6 +601,8 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
if info.PoolTimeout > 0 {
session.poolTimeout = info.PoolTimeout
}
cluster.minPoolSize = info.MinPoolSize
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
cluster.Release()

// People get confused when we return a session that is not actually
Expand Down
86 changes: 86 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"flag"
"fmt"
"math"
"math/rand"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) {
}
}

func (s *S) TestMinPoolSize(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?minPoolSize=0", 0, false},
{"localhost:40001?minPoolSize=1", 1, false},
{"localhost:40001?minPoolSize=-1", -1, true},
{"localhost:40001?minPoolSize=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MinPoolSize, Equals, test.size)
}
}
}

func (s *S) TestMaxIdleTimeMS(c *C) {
tests := []struct {
url string
size int
fail bool
}{
{"localhost:40001?maxIdleTimeMS=0", 0, false},
{"localhost:40001?maxIdleTimeMS=1", 1, false},
{"localhost:40001?maxIdleTimeMS=-1", -1, true},
{"localhost:40001?maxIdleTimeMS=-.", 0, true},
}
for _, test := range tests {
info, err := mgo.ParseURL(test.url)
if test.fail {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(info.MaxIdleTimeMS, Equals, test.size)
}
}
}

func (s *S) TestPoolShrink(c *C) {
if *fast {
c.Skip("-fast")
}
oldSocket := mgo.GetStats().SocketsAlive

session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000")
c.Assert(err, IsNil)
defer session.Close()

parallel := 10
res := make(chan error, parallel+1)
wg := &sync.WaitGroup{}
for i := 1; i < parallel; i++ {
wg.Add(1)
go func() {
s := session.Copy()
defer s.Close()
result := struct{}{}
err := s.Run("ping", &result)

//sleep random time to make the allocate and release in different sequence
time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond)
res <- err
wg.Done()
}()
}
wg.Wait()
stats := mgo.GetStats()
c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket)

// give some time for shrink the pool, the tick is set to 1 minute
c.Log("Sleeping... 1 minute to for pool shrinking")
time.Sleep(60 * time.Second)

stats = mgo.GetStats()
c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket)
c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false)
}

func (s *S) TestURLReadPreferenceTags(c *C) {
type test struct {
url string
Expand Down
1 change: 1 addition & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type mongoSocket struct {
dead error
serverInfo *mongoServerInfo
closeAfterIdle bool
lastTimeUsed time.Time // for time based idle socket release
sendMeta sync.Once
}

Expand Down

0 comments on commit 2189357

Please sign in to comment.