Skip to content

Commit

Permalink
Fix for deadlock in cluster: isMaster() globalsign#120 (globalsign#121)
Browse files Browse the repository at this point in the history
Proposed fix for deadlock in globalsign#120
  • Loading branch information
szank authored Mar 22, 2018
2 parents 7f9dd4f + 56f05b4 commit 00e7550
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul
})
})

err := session.Run(cmd, result)
err := session.runOnSocket(socket, cmd, result)
session.Close()
return err
}
Expand Down
35 changes: 35 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) {
wg.Wait()
}

func (s *S) TestNoDeadlockOnClose(c *C) {
if *fast {
// Unfortunately I seem to need quite a high dial timeout to get this to work
// on my machine.
c.Skip("-fast")
}

var shouldStop int32
atomic.StoreInt32(&shouldStop, 0)

listener, err := net.Listen("tcp4", "127.0.0.1:")
c.Check(err, Equals, nil)

go func() {
for atomic.LoadInt32(&shouldStop) == 0 {
sock, err := listener.Accept()
if err != nil {
// Probs just closed
continue
}
sock.Close()
}
}()
defer func() {
atomic.StoreInt32(&shouldStop, 1)
listener.Close()
}()

session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second)
// If execution reaches here, the deadlock did not happen and all is OK
if session != nil {
session.Close()
}
}

func (s *S) TestSelectServers(c *C) {
if !s.versionAtLeast(2, 2) {
c.Skip("read preferences introduced in 2.2")
Expand Down
16 changes: 16 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error {
return db.run(socket, cmd, result)
}

// runOnSocket does the same as Run, but guarantees that your command will be run
// on the provided socket instance; if it's unhealthy, you will receive the error
// from it.
func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error {
socket.Acquire()
defer socket.Release()
return db.run(socket, cmd, result)
}

// Credential holds details to authenticate with a MongoDB server.
type Credential struct {
// Username and Password hold the basic details for authentication.
Expand Down Expand Up @@ -2312,6 +2321,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error {
return s.DB("admin").Run(cmd, result)
}

// runOnSocket does the same as Run, but guarantees that your command will be run
// on the provided socket instance; if it's unhealthy, you will receive the error
// from it.
func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error {
return s.DB("admin").runOnSocket(socket, cmd, result)
}

// SelectServers restricts communication to servers configured with the
// given tags. For example, the following statement restricts servers
// used for reading operations to those with both tag "disk" set to
Expand Down

0 comments on commit 00e7550

Please sign in to comment.