Skip to content

Commit

Permalink
make the releaser simpler
Browse files Browse the repository at this point in the history
Because the acquire and recycle are done at the tail of array,
the head is always the oldest unused socket.

Signed-off-by: Wang Xu <gnawux@gmail.com>
  • Loading branch information
gnawux committed Jan 24, 2018
1 parent 6f7ed6c commit c22c3bc
Showing 1 changed file with 34 additions and 39 deletions.
73 changes: 34 additions & 39 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize int, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
go server.pinger(true)
Expand Down Expand Up @@ -217,8 +217,8 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
if !server.closed {
now := time.Now()
server.unusedSockets = append(server.unusedSockets, &timedMongoSocket{
lastTimeUsed:&now,
soc:socket,
lastTimeUsed: &now,
soc: socket,
})
}
server.Unlock()
Expand Down Expand Up @@ -358,44 +358,39 @@ func (server *mongoServer) pinger(loop bool) {
}

func (server *mongoServer) releaser() {
for {

time.Sleep(1 * time.Minute)
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.RLock()
if len(server.unusedSockets) < server.minPoolSize {
server.RUnlock()
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
tmpSlice := make([]*timedMongoSocket, 0, len(server.unusedSockets) - server.minPoolSize)
for _, s := range server.unusedSockets {
if len(tmpSlice) == cap(tmpSlice) {
now := time.Now()
end := 0
// Because the acquirision and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for idx, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
end = idx
break
}
if time.Since(*(s.lastTimeUsed)) > time.Duration(server.maxIdleTimeMS) * time.Millisecond {
tmpSlice = append(tmpSlice, s)
}
}
server.RUnlock()
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*timedMongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
stats.conn(-1*end, server.info.Master)
}
server.Unlock()

if len(tmpSlice) > 0 {
server.Lock()
for _, s := range tmpSlice {
for i, unused := range server.unusedSockets {
if s.soc == unused.soc {
copy(server.unusedSockets[i:], server.unusedSockets[i+1:])
n := len(server.unusedSockets) - 1
server.unusedSockets[n] = nil
server.unusedSockets = server.unusedSockets[:n]
stats.conn(-1, server.info.Master)
s.soc.Close()
break
}
}
}
server.Unlock()
for _, s := range tbr {
s.soc.Close()
}
}
}
Expand Down

0 comments on commit c22c3bc

Please sign in to comment.