diff --git a/cluster.go b/cluster.go index 204f507bd..91a4e9ec2 100644 --- a/cluster.go +++ b/cluster.go @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.Run(cmd, result) + err := session.runOnSocket(socket, cmd, result) session.Close() return err } diff --git a/cluster_test.go b/cluster_test.go index 8945e0962..a0a197048 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) { wg.Wait() } +func (s *S) TestNoDeadlockOnClose(c *C) { + if *fast { + // Unfortunately I seem to need quite a high dial timeout to get this to work + // on my machine. + c.Skip("-fast") + } + + var shouldStop int32 + atomic.StoreInt32(&shouldStop, 0) + + listener, err := net.Listen("tcp4", "127.0.0.1:") + c.Check(err, Equals, nil) + + go func() { + for atomic.LoadInt32(&shouldStop) == 0 { + sock, err := listener.Accept() + if err != nil { + // Probs just closed + continue + } + sock.Close() + } + }() + defer func() { + atomic.StoreInt32(&shouldStop, 1) + listener.Close() + }() + + session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second) + // If execution reaches here, the deadlock did not happen and all is OK + if session != nil { + session.Close() + } +} + func (s *S) TestSelectServers(c *C) { if !s.versionAtLeast(2, 2) { c.Skip("read preferences introduced in 2.2") diff --git a/session.go b/session.go index d1c88420e..3a27caf30 100644 --- a/session.go +++ b/session.go @@ -848,6 +848,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } +// runOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + socket.Acquire() + defer socket.Release() + return db.run(socket, cmd, result) +} + // Credential holds details to authenticate with a MongoDB server. type Credential struct { // Username and Password hold the basic details for authentication. @@ -2312,6 +2321,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } +// runOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").runOnSocket(socket, cmd, result) +} + // SelectServers restricts communication to servers configured with the // given tags. For example, the following statement restricts servers // used for reading operations to those with both tag "disk" set to