From 775e6d764413ec89303cedcc9013d7b9328890bc Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 8 May 2018 14:20:40 +0100 Subject: [PATCH 1/9] socket: separate read/write network timeouts Splits DialInfo.Timeout (defaults to 60s when using mgo.Dial()) into ReadTimeout and WriteTimeout to address #160. Read/write timeout defaults to DialInfo.Timeout to preserve existing behaviour. --- cluster.go | 17 +++-- server.go | 44 ++++++------ session.go | 184 +++++++++++++++++++++++++++++++++++++++--------- session_test.go | 13 ++++ socket.go | 38 +++++++--- 5 files changed, 226 insertions(+), 70 deletions(-) diff --git a/cluster.go b/cluster.go index 4e54c5d81..4959a46ba 100644 --- a/cluster.go +++ b/cluster.go @@ -146,8 +146,17 @@ type isMasterResult struct { } func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error { + // I'm not really sure why the timeout was hard coded to these values (I + // assume because everything is passed as a func arg, and thus this info is + // unavailable here), but leaving them as is for backwards compatibility. + config := &DialInfo{ + Timeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + // Monotonic let's it talk to a slave and still hold the socket. - session := newSession(Monotonic, cluster, 10*time.Second) + session := newSession(Monotonic, cluster, config) session.setSocket(socket) var cmd = bson.D{{Name: "isMaster", Value: 1}} @@ -624,9 +633,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout // AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. -func (cluster *mongoCluster) AcquireSocketWithPoolTimeout( - mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration, -) (s *mongoSocket, err error) { +func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) { var started time.Time var syncCount uint for { @@ -670,7 +677,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout( continue } - s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout) + s, abended, err := server.AcquireSocketWithBlocking(info) if err == errPoolTimeout { // No need to remove servers from the topology if acquiring a socket fails for this reason. return nil, err diff --git a/server.go b/server.go index f34624f74..d8663efd8 100644 --- a/server.go +++ b/server.go @@ -124,21 +124,23 @@ var errServerClosed = errors.New("server was closed") // use in this server is greater than the provided limit, errPoolLimit is // returned. func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { - return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond) + info := &DialInfo{ + PoolLimit: poolLimit, + ReadTimeout: timeout, + WriteTimeout: timeout, + Timeout: timeout, + } + return server.acquireSocketInternal(info, false) } // AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_ // return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout // should elapse before a socket is available, it will return errPoolTimeout. -func (server *mongoServer) AcquireSocketWithBlocking( - poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration, -) (socket *mongoSocket, abended bool, err error) { - return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout) +func (server *mongoServer) AcquireSocketWithBlocking(info *DialInfo) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(info, true) } -func (server *mongoServer) acquireSocketInternal( - poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration, -) (socket *mongoSocket, abended bool, err error) { +func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock bool) (socket *mongoSocket, abended bool, err error) { for { server.Lock() abended = server.abended @@ -146,7 +148,7 @@ func (server *mongoServer) acquireSocketInternal( server.Unlock() return nil, abended, errServerClosed } - if poolLimit > 0 { + if info.poolLimit() > 0 { if shouldBlock { // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, @@ -158,11 +160,11 @@ func (server *mongoServer) acquireSocketInternal( // https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine. waitDone := make(chan struct{}) timeoutHit := false - if poolTimeout > 0 { + if info.PoolTimeout > 0 { go func() { select { case <-waitDone: - case <-time.After(poolTimeout): + case <-time.After(info.PoolTimeout): // timeoutHit is part of the wait condition, so needs to be changed under mutex. server.Lock() defer server.Unlock() @@ -172,7 +174,7 @@ func (server *mongoServer) acquireSocketInternal( }() } timeSpentWaiting := time.Duration(0) - for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { + for len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() && !timeoutHit { // We only count time spent in Wait(), and not time evaluating the entire loop, // so that in the happy non-blocking path where the condition above evaluates true // first time, we record a nice round zero wait time. @@ -191,7 +193,7 @@ func (server *mongoServer) acquireSocketInternal( // Record that we fetched a connection of of a socket list and how long we spent waiting stats.noticeSocketAcquisition(timeSpentWaiting) } else { - if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { + if len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() { server.Unlock() return nil, false, errPoolLimit } @@ -202,15 +204,15 @@ func (server *mongoServer) acquireSocketInternal( socket = server.unusedSockets[n-1] server.unusedSockets[n-1] = nil // Help GC. server.unusedSockets = server.unusedSockets[:n-1] - info := server.info + serverInfo := server.info server.Unlock() - err = socket.InitialAcquire(info, timeout) + err = socket.InitialAcquire(serverInfo, info) if err != nil { continue } } else { server.Unlock() - socket, err = server.Connect(timeout) + socket, err = server.Connect(info) if err == nil { server.Lock() // We've waited for the Connect, see if we got @@ -231,20 +233,18 @@ func (server *mongoServer) acquireSocketInternal( // Connect establishes a new connection to the server. This should // generally be done through server.AcquireSocket(). -func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) { +func (server *mongoServer) Connect(info *DialInfo) (*mongoSocket, error) { server.RLock() master := server.info.Master dial := server.dial server.RUnlock() - logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout) + logf("Establishing new connection to %s (timeout=%s)...", server.Addr, info.Timeout) var conn net.Conn var err error switch { case !dial.isSet(): - // Cannot do this because it lacks timeout support. :-( - //conn, err = net.DialTCP("tcp", nil, server.tcpaddr) - conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout) + conn, err = net.DialTimeout("tcp", server.ResolvedAddr, info.Timeout) if tcpconn, ok := conn.(*net.TCPConn); ok { tcpconn.SetKeepAlive(true) } else if err == nil { @@ -264,7 +264,7 @@ func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) logf("Connection to %s established.", server.Addr) stats.conn(+1, master) - return newSocket(server, conn, timeout), nil + return newSocket(server, conn, info), nil } // Close forces closing all sockets that are alive, whether diff --git a/session.go b/session.go index 167a0375d..162d015c0 100644 --- a/session.go +++ b/session.go @@ -73,6 +73,24 @@ const ( Monotonic Mode = 1 // Strong mode is specific to mgo, and is same as Primary. Strong Mode = 2 + + // DefaultConnectionPoolSize defines the default maximum number of + // connections in the connection pool. + // + // To override this value set DialInfo.PoolLimit. + DefaultConnectionPoolSize = 4096 + + // DefaultReadTimeout is set to 60 seconds for backwards compatibility. + // + // See DialInfo.ReadTimeout + DefaultReadTimeout = time.Second * 60 + + // DefaultWriteTimeout is set to 60 seconds for backwards compatibility. + // + // See DialInfo.WriteTimeout + DefaultWriteTimeout = time.Second * 60 + + zeroDuration = time.Duration(0) ) // mgo.v3: Drop Strong mode, suffix all modes with "Mode". @@ -90,9 +108,6 @@ type Session struct { defaultdb string sourcedb string syncTimeout time.Duration - sockTimeout time.Duration - poolLimit int - poolTimeout time.Duration consistency Mode creds []Credential dialCred *Credential @@ -104,6 +119,8 @@ type Session struct { queryConfig query bypassValidation bool slaveOk bool + + dialInfo *DialInfo } // Database holds collections of documents @@ -196,7 +213,7 @@ const ( // Dial will timeout after 10 seconds if a server isn't reached. The returned // session will timeout operations after one minute by default if servers aren't // available. To customize the timeout, see DialWithTimeout, SetSyncTimeout, and -// SetSocketTimeout. +// DialInfo Read/WriteTimeout. // // This method is generally called just once for a given cluster. Further // sessions to the same cluster are then established using the New or Copy @@ -483,15 +500,38 @@ type DialInfo struct { Username string Password string - // PoolLimit defines the per-server socket pool limit. Defaults to 4096. - // See Session.SetPoolLimit for details. + // PoolLimit defines the per-server socket pool limit. Defaults to + // DefaultConnectionPoolSize. See Session.SetPoolLimit for details. PoolLimit int // PoolTimeout defines max time to wait for a connection to become available - // if the pool limit is reaqched. Defaults to zero, which means forever. - // See Session.SetPoolTimeout for details + // if the pool limit is reached. Defaults to zero, which means forever. See + // Session.SetPoolTimeout for details PoolTimeout time.Duration + // ReadTimeout defines the maximum duration to wait for a response from + // MongoDB. + // + // This effectively limits the maximum query execution time. If a MongoDB + // query duration exceeds this timeout, the caller will receive a timeout, + // however MongoDB will continue processing the query. This duration must be + // large enough to allow MongoDB to execute the query, and the response be + // received over the network connection. + // + // Only limits the network read - does not include unmarshalling / + // processing of the response. Defaults to DialInfo.Timeout. If 0, no + // timeout is set. + ReadTimeout time.Duration + + // WriteTimeout defines the maximum duration of a write to MongoDB over the + // network connection. + // + // This is can usually be low unless writing large documents, or over a high + // latency link. Only limits network write time - does not include + // marshalling/processing the request. Defaults to DialInfo.Timeout. If 0, + // no timeout is set. + WriteTimeout time.Duration + // The identifier of the client application which ran the operation. AppName string @@ -527,6 +567,80 @@ type DialInfo struct { Dial func(addr net.Addr) (net.Conn, error) } +// Copy returns a deep-copy of i. +func (i *DialInfo) Copy() *DialInfo { + var readPreference *ReadPreference + if i.ReadPreference != nil { + readPreference = &ReadPreference{ + Mode: i.ReadPreference.Mode, + } + readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets)) + copy(readPreference.TagSets, i.ReadPreference.TagSets) + } + + info := &DialInfo{ + Timeout: i.Timeout, + Database: i.Database, + ReplicaSetName: i.ReplicaSetName, + Source: i.Source, + Service: i.Service, + ServiceHost: i.ServiceHost, + Mechanism: i.Mechanism, + Username: i.Username, + Password: i.Password, + PoolLimit: i.PoolLimit, + PoolTimeout: i.PoolTimeout, + ReadTimeout: i.ReadTimeout, + WriteTimeout: i.WriteTimeout, + AppName: i.AppName, + ReadPreference: readPreference, + FailFast: i.FailFast, + Direct: i.Direct, + MinPoolSize: i.MinPoolSize, + MaxIdleTimeMS: i.MaxIdleTimeMS, + DialServer: i.DialServer, + Dial: i.Dial, + } + + info.Addrs = make([]string, len(i.Addrs)) + copy(info.Addrs, i.Addrs) + + return info +} + +// readTimeout returns the configured read timeout, or DefaultReadTimeout if +// unset. +func (i *DialInfo) readTimeout() time.Duration { + if i.ReadTimeout == zeroDuration { + return i.Timeout + } + return i.ReadTimeout +} + +// writeTimeout returns the configured write timeout, or DefaultWriteTimeout if +// unset. +func (i *DialInfo) writeTimeout() time.Duration { + if i.WriteTimeout == zeroDuration { + return i.Timeout + } + return i.WriteTimeout +} + +// roundTripTimeout returns the total time allocated for a single network read +// and write. +func (i *DialInfo) roundTripTimeout() time.Duration { + return i.readTimeout() + i.writeTimeout() +} + +// poolLimit returns the configured connection pool size, or +// DefaultConnectionPoolSize. +func (i *DialInfo) poolLimit() int { + if i == nil || i.PoolLimit == 0 { + return DefaultConnectionPoolSize + } + return i.PoolLimit +} + // ReadPreference defines the manner in which servers are chosen. type ReadPreference struct { // Mode determines the consistency of results. See Session.SetMode. @@ -556,7 +670,8 @@ func (addr *ServerAddr) TCPAddr() *net.TCPAddr { } // DialWithInfo establishes a new session to the cluster identified by info. -func DialWithInfo(info *DialInfo) (*Session, error) { +func DialWithInfo(dialInfo *DialInfo) (*Session, error) { + info := dialInfo.Copy() addrs := make([]string, len(info.Addrs)) for i, addr := range info.Addrs { p := strings.LastIndexAny(addr, "]:") @@ -567,7 +682,7 @@ func DialWithInfo(info *DialInfo) (*Session, error) { addrs[i] = addr } cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName, info.AppName) - session := newSession(Eventual, cluster, info.Timeout) + session := newSession(Eventual, cluster, info) session.defaultdb = info.Database if session.defaultdb == "" { session.defaultdb = "test" @@ -595,17 +710,9 @@ func DialWithInfo(info *DialInfo) (*Session, error) { } session.creds = []Credential{*session.dialCred} } - if info.PoolLimit > 0 { - session.poolLimit = info.PoolLimit - } cluster.minPoolSize = info.MinPoolSize cluster.maxIdleTimeMS = info.MaxIdleTimeMS - - if info.PoolTimeout > 0 { - session.poolTimeout = info.PoolTimeout - } - cluster.Release() // People get confused when we return a session that is not actually @@ -624,6 +731,8 @@ func DialWithInfo(info *DialInfo) (*Session, error) { session.SetMode(Strong, true) } + session.dialInfo = info + return session, nil } @@ -684,13 +793,12 @@ func extractURL(s string) (*urlInfo, error) { return info, nil } -func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) { +func newSession(consistency Mode, cluster *mongoCluster, info *DialInfo) (session *Session) { cluster.Acquire() session = &Session{ mgoCluster: cluster, - syncTimeout: timeout, - sockTimeout: timeout, - poolLimit: 4096, + syncTimeout: info.Timeout, + dialInfo: info, } debugf("New session %p on cluster %p", session, cluster) session.SetMode(consistency, true) @@ -719,9 +827,6 @@ func copySession(session *Session, keepCreds bool) (s *Session) { defaultdb: session.defaultdb, sourcedb: session.sourcedb, syncTimeout: session.syncTimeout, - sockTimeout: session.sockTimeout, - poolLimit: session.poolLimit, - poolTimeout: session.poolTimeout, consistency: session.consistency, creds: creds, dialCred: session.dialCred, @@ -733,6 +838,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) { queryConfig: session.queryConfig, bypassValidation: session.bypassValidation, slaveOk: session.slaveOk, + dialInfo: session.dialInfo, } s = &scopy debugf("New session %p on cluster %p (copy from %p)", s, cluster, session) @@ -2018,13 +2124,21 @@ func (s *Session) SetSyncTimeout(d time.Duration) { s.m.Unlock() } -// SetSocketTimeout sets the amount of time to wait for a non-responding -// socket to the database before it is forcefully closed. +// SetSocketTimeout is deprecated - use DialInfo read/write timeouts instead. +// +// SetSocketTimeout sets the amount of time to wait for a non-responding socket +// to the database before it is forcefully closed. // // The default timeout is 1 minute. func (s *Session) SetSocketTimeout(d time.Duration) { s.m.Lock() - s.sockTimeout = d + + // Set both the read and write timeout, as well as the DialInfo.Timeout for + // backwards compatibility, + s.dialInfo.Timeout = d + s.dialInfo.ReadTimeout = d + s.dialInfo.WriteTimeout = d + if s.masterSocket != nil { s.masterSocket.SetTimeout(d) } @@ -2058,7 +2172,7 @@ func (s *Session) SetCursorTimeout(d time.Duration) { // of used resources and number of goroutines before they are created. func (s *Session) SetPoolLimit(limit int) { s.m.Lock() - s.poolLimit = limit + s.dialInfo.PoolLimit = limit s.m.Unlock() } @@ -2068,7 +2182,7 @@ func (s *Session) SetPoolLimit(limit int) { // The default value is zero, which means to wait forever with no timeout. func (s *Session) SetPoolTimeout(timeout time.Duration) { s.m.Lock() - s.poolTimeout = timeout + s.dialInfo.PoolTimeout = timeout s.m.Unlock() } @@ -4356,9 +4470,11 @@ func (iter *Iter) acquireSocket() (*mongoSocket, error) { // with Eventual sessions, if a Refresh is done, or if a // monotonic session gets a write and shifts from secondary // to primary. Our cursor is in a specific server, though. + iter.session.m.Lock() - sockTimeout := iter.session.sockTimeout + sockTimeout := iter.session.dialInfo.roundTripTimeout() iter.session.m.Unlock() + socket.Release() socket, _, err = iter.server.AcquireSocket(0, sockTimeout) if err != nil { @@ -4950,7 +5066,11 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { // Still not good. We need a new socket. sock, err := s.cluster().AcquireSocketWithPoolTimeout( - s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout, + s.consistency, + slaveOk && s.slaveOk, + s.syncTimeout, + s.queryConfig.op.serverTags, + s.dialInfo, ) if err != nil { return nil, err diff --git a/session_test.go b/session_test.go index 14cb9b1a6..c3723b909 100644 --- a/session_test.go +++ b/session_test.go @@ -4969,6 +4969,19 @@ func (s *S) TestCollationQueries(c *C) { } } +func (s *S) TestDialTimeouts(c *C) { + info := &mgo.DialInfo{} + + c.Assert(info.readTimeout(), Equals, mgo.DefaultReadTimeout) + c.Assert(info.writeTimeout(), Equals, mgo.DefaultWriteTimeout) + + info.ReadTimeout = time.Second + c.Assert(info.readTimeout(), Equals, time.Second) + + info.WriteTimewut = time.Second + c.Assert(info.writeTimeout(), Equals, time.Second) +} + // -------------------------------------------------------------------------- // Some benchmarks that require a running database. diff --git a/socket.go b/socket.go index ae13e401f..f3f477604 100644 --- a/socket.go +++ b/socket.go @@ -42,7 +42,6 @@ type mongoSocket struct { sync.Mutex server *mongoServer // nil when cached conn net.Conn - timeout time.Duration addr string // For debugging only. nextRequestId uint32 replyFuncs map[uint32]replyFunc @@ -56,6 +55,8 @@ type mongoSocket struct { closeAfterIdle bool lastTimeUsed time.Time // for time based idle socket release sendMeta sync.Once + + dialInfo *DialInfo } type queryOpFlags uint32 @@ -181,15 +182,16 @@ type requestInfo struct { replyFunc replyFunc } -func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket { +func newSocket(server *mongoServer, conn net.Conn, info *DialInfo) *mongoSocket { socket := &mongoSocket{ conn: conn, addr: server.Addr, server: server, replyFuncs: make(map[uint32]replyFunc), + dialInfo: info, } socket.gotNonce.L = &socket.Mutex - if err := socket.InitialAcquire(server.Info(), timeout); err != nil { + if err := socket.InitialAcquire(server.Info(), info); err != nil { panic("newSocket: InitialAcquire returned error: " + err.Error()) } stats.socketsAlive(+1) @@ -223,7 +225,7 @@ func (socket *mongoSocket) ServerInfo() *mongoServerInfo { // InitialAcquire obtains the first reference to the socket, either // right after the connection is made or once a recycled socket is // being put back in use. -func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error { +func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, dialInfo *DialInfo) error { socket.Lock() if socket.references > 0 { panic("Socket acquired out of cache with references") @@ -235,7 +237,7 @@ func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout t } socket.references++ socket.serverInfo = serverInfo - socket.timeout = timeout + socket.dialInfo = dialInfo stats.socketsInUse(+1) stats.socketRefs(+1) socket.Unlock() @@ -288,7 +290,8 @@ func (socket *mongoSocket) Release() { // SetTimeout changes the timeout used on socket operations. func (socket *mongoSocket) SetTimeout(d time.Duration) { socket.Lock() - socket.timeout = d + socket.dialInfo.ReadTimeout = d + socket.dialInfo.WriteTimeout = d socket.Unlock() } @@ -301,24 +304,37 @@ const ( func (socket *mongoSocket) updateDeadline(which deadlineType) { var when time.Time - if socket.timeout > 0 { - when = time.Now().Add(socket.timeout) - } - whichstr := "" + var whichstr string switch which { case readDeadline | writeDeadline: + if socket.dialInfo.roundTripTimeout() == 0 { + return + } whichstr = "read/write" + when = time.Now().Add(socket.dialInfo.roundTripTimeout()) socket.conn.SetDeadline(when) + case readDeadline: + if socket.dialInfo.readTimeout() == 0 { + return + } whichstr = "read" + when = time.Now().Add(socket.dialInfo.readTimeout()) socket.conn.SetReadDeadline(when) + case writeDeadline: + if socket.dialInfo.writeTimeout() == 0 { + return + } whichstr = "write" + when = time.Now().Add(socket.dialInfo.writeTimeout()) socket.conn.SetWriteDeadline(when) + default: panic("invalid parameter to updateDeadline") } - debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when) + + debugf("Socket %p to %s: updated %s deadline to %s", socket, socket.addr, whichstr, when) } // Close terminates the socket use. From 95f23331132617a0e068d2770a7362118f9cc71d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 9 May 2018 10:49:08 +0100 Subject: [PATCH 2/9] cluster: remove AcquireSocket Only used by tests, replaced by the pool-aware acquire socket functions: * AcquireSocketWithPoolTimeout * AcquireSocketWithBlocking --- cluster.go | 7 ------- server_test.go | 6 +++--- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/cluster.go b/cluster.go index 4959a46ba..ba40ca819 100644 --- a/cluster.go +++ b/cluster.go @@ -623,13 +623,6 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { cluster.Unlock() } -// AcquireSocket returns a socket to a server in the cluster. If slaveOk is -// true, it will attempt to return a socket to a slave server. If it is -// false, the socket will necessarily be to a master server. -func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { - return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0) -} - // AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. diff --git a/server_test.go b/server_test.go index 1d21ef08b..b502a922f 100644 --- a/server_test.go +++ b/server_test.go @@ -29,8 +29,8 @@ package mgo_test import ( "time" - . "gopkg.in/check.v1" "github.com/globalsign/mgo" + . "gopkg.in/check.v1" ) func (s *S) TestServerRecoversFromAbend(c *C) { @@ -53,8 +53,8 @@ func (s *S) TestServerRecoversFromAbend(c *C) { c.Assert(err, IsNil) sock.Release() c.Check(abended, Equals, true) - // cluster.AcquireSocket should fix the abended problems - sock, err = cluster.AcquireSocket(mgo.Primary, false, time.Minute, time.Second, nil, 100) + // cluster.AcquireSocketWithPoolTimeout should fix the abended problems + sock, err = cluster.AcquireSocketWithPoolTimeout(mgo.Primary, false, time.Minute, time.Second, nil, 100) c.Assert(err, IsNil) sock.Release() sock, abended, err = server.AcquireSocket(100, time.Second) From 545befdcb3783dcb8bfab2c698a96ed098a017f9 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 9 May 2018 11:56:09 +0100 Subject: [PATCH 3/9] cluster: use configured timeouts for cluster operations * `mongoCluster.syncServer()` no longer uses hard-coded 5 seconds * `mongoCluster.isMaster()` no longer uses hard-coded 10 seconds --- cluster.go | 73 +++++++++++++++++++----------------------------------- server.go | 41 +++++++++++++----------------- session.go | 8 +++--- 3 files changed, 46 insertions(+), 76 deletions(-) diff --git a/cluster.go b/cluster.go index ba40ca819..f8371821f 100644 --- a/cluster.go +++ b/cluster.go @@ -55,27 +55,21 @@ type mongoCluster struct { masters mongoServers references int syncing bool - direct bool - failFast bool syncCount uint - setName string cachedIndex map[string]bool sync chan bool dial dialer - appName string minPoolSize int maxIdleTimeMS int + dialInfo *DialInfo } -func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { +func newCluster(userSeeds []string, info *DialInfo) *mongoCluster { cluster := &mongoCluster{ userSeeds: userSeeds, references: 1, - direct: direct, - failFast: failFast, - dial: dial, - setName: setName, - appName: appName, + dial: dialer{info.Dial, info.DialServer}, + dialInfo: info, } cluster.serverSynced.L = cluster.RWMutex.RLocker() cluster.sync = make(chan bool, 1) @@ -146,17 +140,8 @@ type isMasterResult struct { } func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error { - // I'm not really sure why the timeout was hard coded to these values (I - // assume because everything is passed as a func arg, and thus this info is - // unavailable here), but leaving them as is for backwards compatibility. - config := &DialInfo{ - Timeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - } - // Monotonic let's it talk to a slave and still hold the socket. - session := newSession(Monotonic, cluster, config) + session := newSession(Monotonic, cluster, cluster.dialInfo) session.setSocket(socket) var cmd = bson.D{{Name: "isMaster", Value: 1}} @@ -180,8 +165,8 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul } // Include the application name if set - if cluster.appName != "" { - meta["application"] = bson.M{"name": cluster.appName} + if cluster.dialInfo.AppName != "" { + meta["application"] = bson.M{"name": cluster.dialInfo.AppName} } cmd = append(cmd, bson.DocElem{ @@ -199,19 +184,7 @@ type possibleTimeout interface { Timeout() bool } -var syncSocketTimeout = 5 * time.Second - func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) { - var syncTimeout time.Duration - if raceDetector { - // This variable is only ever touched by tests. - globalMutex.Lock() - syncTimeout = syncSocketTimeout - globalMutex.Unlock() - } else { - syncTimeout = syncSocketTimeout - } - addr := server.Addr log("SYNC Processing ", addr, "...") @@ -219,7 +192,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI var result isMasterResult var tryerr error for retry := 0; ; retry++ { - if retry == 3 || retry == 1 && cluster.failFast { + if retry == 3 || retry == 1 && cluster.dialInfo.FailFast { return nil, nil, tryerr } if retry > 0 { @@ -231,16 +204,22 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI time.Sleep(syncShortDelay) } - // It's not clear what would be a good timeout here. Is it - // better to wait longer or to retry? - socket, _, err := server.AcquireSocket(0, syncTimeout) + // Don't ever hit the pool limit for syncing + config := cluster.dialInfo.Copy() + config.PoolLimit = 0 + + socket, _, err := server.AcquireSocket(config) if err != nil { tryerr = err logf("SYNC Failed to get socket to %s: %v", addr, err) continue } err = cluster.isMaster(socket, &result) + + // Restore the correct dial config before returning it to the pool + socket.dialInfo = cluster.dialInfo socket.Release() + if err != nil { tryerr = err logf("SYNC Command 'ismaster' to %s failed: %v", addr, err) @@ -250,9 +229,9 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI break } - if cluster.setName != "" && result.SetName != cluster.setName { - logf("SYNC Server %s is not a member of replica set %q", addr, cluster.setName) - return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.setName) + if cluster.dialInfo.ReplicaSetName != "" && result.SetName != cluster.dialInfo.ReplicaSetName { + logf("SYNC Server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName) + return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName) } if result.IsMaster { @@ -264,7 +243,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI } } else if result.Secondary { debugf("SYNC %s is a slave.", addr) - } else if cluster.direct { + } else if cluster.dialInfo.Direct { logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr) } else { logf("SYNC %s is neither a master nor a slave.", addr) @@ -395,7 +374,7 @@ func (cluster *mongoCluster) syncServersLoop() { break } cluster.references++ // Keep alive while syncing. - direct := cluster.direct + direct := cluster.dialInfo.Direct cluster.Unlock() cluster.syncServersIteration(direct) @@ -410,7 +389,7 @@ func (cluster *mongoCluster) syncServersLoop() { // Hold off before allowing another sync. No point in // burning CPU looking for down servers. - if !cluster.failFast { + if !cluster.dialInfo.FailFast { time.Sleep(syncShortDelay) } @@ -448,13 +427,11 @@ func (cluster *mongoCluster) syncServersLoop() { func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer { cluster.RLock() server := cluster.servers.Search(tcpaddr.String()) - minPoolSize := cluster.minPoolSize - maxIdleTimeMS := cluster.maxIdleTimeMS cluster.RUnlock() if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.dialInfo) } func resolveAddr(addr string) (*net.TCPAddr, error) { @@ -645,7 +622,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk boo // Initialize after fast path above. started = time.Now() syncCount = cluster.syncCount - } else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount { + } else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.dialInfo.FailFast && cluster.syncCount != syncCount { cluster.RUnlock() return nil, errors.New("no reachable servers") } diff --git a/server.go b/server.go index d8663efd8..3c98d2424 100644 --- a/server.go +++ b/server.go @@ -67,9 +67,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool - minPoolSize int - maxIdleTimeMS int poolWaiter *sync.Cond + dialInfo *DialInfo } type dialer struct { @@ -91,21 +90,20 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, info *DialInfo) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: syncChan, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. - minPoolSize: minPoolSize, - maxIdleTimeMS: maxIdleTimeMS, + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: syncChan, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + dialInfo: info, } server.poolWaiter = sync.NewCond(server) go server.pinger(true) - if maxIdleTimeMS != 0 { + if info.MaxIdleTimeMS != 0 { go server.poolShrinker() } return server @@ -123,13 +121,7 @@ var errServerClosed = errors.New("server was closed") // If the poolLimit argument is greater than zero and the number of sockets in // use in this server is greater than the provided limit, errPoolLimit is // returned. -func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { - info := &DialInfo{ - PoolLimit: poolLimit, - ReadTimeout: timeout, - WriteTimeout: timeout, - Timeout: timeout, - } +func (server *mongoServer) AcquireSocket(info *DialInfo) (socket *mongoSocket, abended bool, err error) { return server.acquireSocketInternal(info, false) } @@ -407,7 +399,8 @@ func (server *mongoServer) pinger(loop bool) { time.Sleep(delay) } op := op - socket, _, err := server.AcquireSocket(0, delay) + + socket, _, err := server.AcquireSocket(server.dialInfo) if err == nil { start := time.Now() _, _ = socket.SimpleQuery(&op) @@ -448,7 +441,7 @@ func (server *mongoServer) poolShrinker() { } server.Lock() unused := len(server.unusedSockets) - if unused < server.minPoolSize { + if unused < server.dialInfo.MinPoolSize { server.Unlock() continue } @@ -457,8 +450,8 @@ func (server *mongoServer) poolShrinker() { reclaimMap := map[*mongoSocket]struct{}{} // Because the acquisition and recycle are done at the tail of array, // the head is always the oldest unused socket. - for _, s := range server.unusedSockets[:unused-server.minPoolSize] { - if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + for _, s := range server.unusedSockets[:unused-server.dialInfo.MinPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.dialInfo.MaxIdleTimeMS) * time.Millisecond).After(now) { break } end++ diff --git a/session.go b/session.go index 162d015c0..a5ccfddc1 100644 --- a/session.go +++ b/session.go @@ -555,7 +555,7 @@ type DialInfo struct { // Defaults to 0. MinPoolSize int - //The maximum number of milliseconds that a connection can remain idle in the pool + // The maximum number of milliseconds that a connection can remain idle in the pool // before being removed and closed. MaxIdleTimeMS int @@ -681,7 +681,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) { } addrs[i] = addr } - cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName, info.AppName) + cluster := newCluster(addrs, info) session := newSession(Eventual, cluster, info) session.defaultdb = info.Database if session.defaultdb == "" { @@ -4472,11 +4472,11 @@ func (iter *Iter) acquireSocket() (*mongoSocket, error) { // to primary. Our cursor is in a specific server, though. iter.session.m.Lock() - sockTimeout := iter.session.dialInfo.roundTripTimeout() + info := iter.session.dialInfo iter.session.m.Unlock() socket.Release() - socket, _, err = iter.server.AcquireSocket(0, sockTimeout) + socket, _, err = iter.server.AcquireSocket(info) if err != nil { return nil, err } From 3b17aab33edc734e77887267d31879abf7bf2c16 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 9 May 2018 12:54:56 +0100 Subject: [PATCH 4/9] tests: use DialInfo for internal timeouts --- cluster_test.go | 2 -- export_test.go | 14 -------------- server_test.go | 14 ++++++++++---- session_internal_test.go | 23 ++++++++++++++++++++++- session_test.go | 13 ------------- 5 files changed, 32 insertions(+), 34 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index be11dc1a7..de99d414d 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1055,8 +1055,6 @@ func (s *S) TestSocketTimeoutOnDial(c *C) { timeout := 1 * time.Second - defer mgo.HackSyncSocketTimeout(timeout)() - s.Freeze("localhost:40001") started := time.Now() diff --git a/export_test.go b/export_test.go index 998c7a2dd..1b7d7e941 100644 --- a/export_test.go +++ b/export_test.go @@ -19,20 +19,6 @@ func HackPingDelay(newDelay time.Duration) (restore func()) { return } -func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) { - globalMutex.Lock() - defer globalMutex.Unlock() - - oldTimeout := syncSocketTimeout - restore = func() { - globalMutex.Lock() - syncSocketTimeout = oldTimeout - globalMutex.Unlock() - } - syncSocketTimeout = newTimeout - return -} - func (s *Session) Cluster() *mongoCluster { return s.cluster() } diff --git a/server_test.go b/server_test.go index b502a922f..43ddfa3b1 100644 --- a/server_test.go +++ b/server_test.go @@ -40,7 +40,13 @@ func (s *S) TestServerRecoversFromAbend(c *C) { // Peek behind the scenes cluster := session.Cluster() server := cluster.Server("127.0.0.1:40001") - sock, abended, err := server.AcquireSocket(100, time.Second) + + info := &mgo.DialInfo{ + Timeout: time.Second, + PoolLimit: 100, + } + + sock, abended, err := server.AcquireSocket(info) c.Assert(err, IsNil) c.Assert(sock, NotNil) sock.Release() @@ -49,15 +55,15 @@ func (s *S) TestServerRecoversFromAbend(c *C) { sock.Close() server.AbendSocket(sock) // Next acquire notices the connection was abnormally ended - sock, abended, err = server.AcquireSocket(100, time.Second) + sock, abended, err = server.AcquireSocket(info) c.Assert(err, IsNil) sock.Release() c.Check(abended, Equals, true) // cluster.AcquireSocketWithPoolTimeout should fix the abended problems - sock, err = cluster.AcquireSocketWithPoolTimeout(mgo.Primary, false, time.Minute, time.Second, nil, 100) + sock, err = cluster.AcquireSocketWithPoolTimeout(mgo.Primary, false, time.Minute, nil, info) c.Assert(err, IsNil) sock.Release() - sock, abended, err = server.AcquireSocket(100, time.Second) + sock, abended, err = server.AcquireSocket(info) c.Assert(err, IsNil) c.Check(abended, Equals, false) sock.Release() diff --git a/session_internal_test.go b/session_internal_test.go index ddce59cae..3e214b174 100644 --- a/session_internal_test.go +++ b/session_internal_test.go @@ -3,9 +3,11 @@ package mgo import ( "crypto/x509/pkix" "encoding/asn1" + "testing" + "time" + "github.com/globalsign/mgo/bson" . "gopkg.in/check.v1" - "testing" ) type S struct{} @@ -62,3 +64,22 @@ func (s *S) TestGetRFC2253NameStringMultiValued(c *C) { c.Assert(getRFC2253NameString(&RDNElements), Equals, "OU=Sales+CN=J. Smith,O=Widget Inc.,C=US") } + +func (s *S) TestDialTimeouts(c *C) { + info := &DialInfo{} + + c.Assert(info.readTimeout(), Equals, time.Duration(0)) + c.Assert(info.writeTimeout(), Equals, time.Duration(0)) + c.Assert(info.roundTripTimeout(), Equals, time.Duration(0)) + + info.Timeout = 60 * time.Second + c.Assert(info.readTimeout(), Equals, 60*time.Second) + c.Assert(info.writeTimeout(), Equals, 60*time.Second) + c.Assert(info.roundTripTimeout(), Equals, 120*time.Second) + + info.ReadTimeout = time.Second + c.Assert(info.readTimeout(), Equals, time.Second) + + info.WriteTimeout = time.Second + c.Assert(info.writeTimeout(), Equals, time.Second) +} diff --git a/session_test.go b/session_test.go index c3723b909..14cb9b1a6 100644 --- a/session_test.go +++ b/session_test.go @@ -4969,19 +4969,6 @@ func (s *S) TestCollationQueries(c *C) { } } -func (s *S) TestDialTimeouts(c *C) { - info := &mgo.DialInfo{} - - c.Assert(info.readTimeout(), Equals, mgo.DefaultReadTimeout) - c.Assert(info.writeTimeout(), Equals, mgo.DefaultWriteTimeout) - - info.ReadTimeout = time.Second - c.Assert(info.readTimeout(), Equals, time.Second) - - info.WriteTimewut = time.Second - c.Assert(info.writeTimeout(), Equals, time.Second) -} - // -------------------------------------------------------------------------- // Some benchmarks that require a running database. From c0b90526a9be39ff3f1fd9c23e16dbbd194a902e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 9 May 2018 17:28:32 +0100 Subject: [PATCH 5/9] server: fix fantastic serverTags nil slice bug When unmarshalling serverTags, it is now an empty slice, instead of a nil slice. `len(thing) == 0` works all the time, regardless. --- server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 3c98d2424..3c73302fc 100644 --- a/server.go +++ b/server.go @@ -565,7 +565,7 @@ func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServe if best == nil { best = next best.RLock() - if serverTags != nil && !next.info.Mongos && !best.hasTags(serverTags) { + if len(serverTags) != 0 && !next.info.Mongos && !best.hasTags(serverTags) { best.RUnlock() best = nil } @@ -574,7 +574,7 @@ func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServe next.RLock() swap := false switch { - case serverTags != nil && !next.info.Mongos && !next.hasTags(serverTags): + case len(serverTags) != 0 && !next.info.Mongos && !next.hasTags(serverTags): // Must have requested tags. case mode == Secondary && next.info.Master && !next.info.Mongos: // Must be a secondary or mongos. From fbe8acd49475c1066e287141af30fd929998e6dd Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 11 May 2018 13:23:12 +0100 Subject: [PATCH 6/9] cluster: remove unused duplicate pool config --- cluster.go | 26 ++++++++++++-------------- session.go | 2 -- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/cluster.go b/cluster.go index f8371821f..ff431cac5 100644 --- a/cluster.go +++ b/cluster.go @@ -48,20 +48,18 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - syncCount uint - cachedIndex map[string]bool - sync chan bool - dial dialer - minPoolSize int - maxIdleTimeMS int - dialInfo *DialInfo + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + syncCount uint + cachedIndex map[string]bool + sync chan bool + dial dialer + dialInfo *DialInfo } func newCluster(userSeeds []string, info *DialInfo) *mongoCluster { diff --git a/session.go b/session.go index a5ccfddc1..067505552 100644 --- a/session.go +++ b/session.go @@ -711,8 +711,6 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) { session.creds = []Credential{*session.dialCred} } - cluster.minPoolSize = info.MinPoolSize - cluster.maxIdleTimeMS = info.MaxIdleTimeMS cluster.Release() // People get confused when we return a session that is not actually From 2573a576faeec5018fd33f55c73b29d70101c9da Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 11 May 2018 13:26:31 +0100 Subject: [PATCH 7/9] session: avoid calculating default values in hot path Changes `DialWithInfo` to handle setting default values by setting the relevant `DialInfo` field, rather than calling the respective methods in the hot path for: * `PoolLimit` * `ReadTimeout` * `WriteTimeout` --- server.go | 6 +++--- session.go | 10 +++++++--- socket.go | 18 +++++++++--------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/server.go b/server.go index 3c73302fc..6f51ca5e3 100644 --- a/server.go +++ b/server.go @@ -140,7 +140,7 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo server.Unlock() return nil, abended, errServerClosed } - if info.poolLimit() > 0 { + if info.PoolLimit > 0 { if shouldBlock { // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, @@ -166,7 +166,7 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo }() } timeSpentWaiting := time.Duration(0) - for len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() && !timeoutHit { + for len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit && !timeoutHit { // We only count time spent in Wait(), and not time evaluating the entire loop, // so that in the happy non-blocking path where the condition above evaluates true // first time, we record a nice round zero wait time. @@ -185,7 +185,7 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo // Record that we fetched a connection of of a socket list and how long we spent waiting stats.noticeSocketAcquisition(timeSpentWaiting) } else { - if len(server.liveSockets)-len(server.unusedSockets) >= info.poolLimit() { + if len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit { server.Unlock() return nil, false, errPoolLimit } diff --git a/session.go b/session.go index 067505552..be8c200ab 100644 --- a/session.go +++ b/session.go @@ -633,10 +633,10 @@ func (i *DialInfo) roundTripTimeout() time.Duration { } // poolLimit returns the configured connection pool size, or -// DefaultConnectionPoolSize. +// DefaultConnectionPoolLimit. func (i *DialInfo) poolLimit() int { - if i == nil || i.PoolLimit == 0 { - return DefaultConnectionPoolSize + if i.PoolLimit == 0 { + return DefaultConnectionPoolLimit } return i.PoolLimit } @@ -672,6 +672,10 @@ func (addr *ServerAddr) TCPAddr() *net.TCPAddr { // DialWithInfo establishes a new session to the cluster identified by info. func DialWithInfo(dialInfo *DialInfo) (*Session, error) { info := dialInfo.Copy() + info.PoolLimit = info.poolLimit() + info.ReadTimeout = info.readTimeout() + info.WriteTimeout = info.writeTimeout() + addrs := make([]string, len(info.Addrs)) for i, addr := range info.Addrs { p := strings.LastIndexAny(addr, "]:") diff --git a/socket.go b/socket.go index f3f477604..9dcedf219 100644 --- a/socket.go +++ b/socket.go @@ -304,37 +304,37 @@ const ( func (socket *mongoSocket) updateDeadline(which deadlineType) { var when time.Time - var whichstr string + var whichStr string switch which { case readDeadline | writeDeadline: if socket.dialInfo.roundTripTimeout() == 0 { return } - whichstr = "read/write" + whichStr = "read/write" when = time.Now().Add(socket.dialInfo.roundTripTimeout()) socket.conn.SetDeadline(when) case readDeadline: - if socket.dialInfo.readTimeout() == 0 { + if socket.dialInfo.ReadTimeout == zeroDuration { return } - whichstr = "read" - when = time.Now().Add(socket.dialInfo.readTimeout()) + whichStr = "read" + when = time.Now().Add(socket.dialInfo.ReadTimeout) socket.conn.SetReadDeadline(when) case writeDeadline: - if socket.dialInfo.writeTimeout() == 0 { + if socket.dialInfo.WriteTimeout == zeroDuration { return } - whichstr = "write" - when = time.Now().Add(socket.dialInfo.writeTimeout()) + whichStr = "write" + when = time.Now().Add(socket.dialInfo.WriteTimeout) socket.conn.SetWriteDeadline(when) default: panic("invalid parameter to updateDeadline") } - debugf("Socket %p to %s: updated %s deadline to %s", socket, socket.addr, whichstr, when) + debugf("Socket %p to %s: updated %s deadline to %s", socket, socket.addr, whichStr, when) } // Close terminates the socket use. From 716ee13c64795ad2821f3fb6d4b36ad43eec3148 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 11 May 2018 13:26:48 +0100 Subject: [PATCH 8/9] session: remove unused consts --- session.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/session.go b/session.go index be8c200ab..e92dec4ce 100644 --- a/session.go +++ b/session.go @@ -74,21 +74,11 @@ const ( // Strong mode is specific to mgo, and is same as Primary. Strong Mode = 2 - // DefaultConnectionPoolSize defines the default maximum number of + // DefaultConnectionPoolLimit defines the default maximum number of // connections in the connection pool. // // To override this value set DialInfo.PoolLimit. - DefaultConnectionPoolSize = 4096 - - // DefaultReadTimeout is set to 60 seconds for backwards compatibility. - // - // See DialInfo.ReadTimeout - DefaultReadTimeout = time.Second * 60 - - // DefaultWriteTimeout is set to 60 seconds for backwards compatibility. - // - // See DialInfo.WriteTimeout - DefaultWriteTimeout = time.Second * 60 + DefaultConnectionPoolLimit = 4096 zeroDuration = time.Duration(0) ) @@ -501,7 +491,7 @@ type DialInfo struct { Password string // PoolLimit defines the per-server socket pool limit. Defaults to - // DefaultConnectionPoolSize. See Session.SetPoolLimit for details. + // DefaultConnectionPoolLimit. See Session.SetPoolLimit for details. PoolLimit int // PoolTimeout defines max time to wait for a connection to become available @@ -509,8 +499,8 @@ type DialInfo struct { // Session.SetPoolTimeout for details PoolTimeout time.Duration - // ReadTimeout defines the maximum duration to wait for a response from - // MongoDB. + // ReadTimeout defines the maximum duration to wait for a response to be + // read from MongoDB. // // This effectively limits the maximum query execution time. If a MongoDB // query duration exceeds this timeout, the caller will receive a timeout, From 22be2e3e8bf23e39290e545cef2c78fac19b8c53 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 11 May 2018 14:01:21 +0100 Subject: [PATCH 9/9] session: update docs --- session.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index e92dec4ce..edd43f717 100644 --- a/session.go +++ b/session.go @@ -598,8 +598,7 @@ func (i *DialInfo) Copy() *DialInfo { return info } -// readTimeout returns the configured read timeout, or DefaultReadTimeout if -// unset. +// readTimeout returns the configured read timeout, or i.Timeout if it's not set func (i *DialInfo) readTimeout() time.Duration { if i.ReadTimeout == zeroDuration { return i.Timeout @@ -607,8 +606,8 @@ func (i *DialInfo) readTimeout() time.Duration { return i.ReadTimeout } -// writeTimeout returns the configured write timeout, or DefaultWriteTimeout if -// unset. +// writeTimeout returns the configured write timeout, or i.Timeout if it's not +// set func (i *DialInfo) writeTimeout() time.Duration { if i.WriteTimeout == zeroDuration { return i.Timeout