From 08c5182e562a8880ef07ffb4295706d5bc1953a2 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 10 Mar 2018 18:22:31 +1030 Subject: [PATCH 1/3] Add a test for globalsign/mgo#120 We've seen a deadlock happen occasionally where syncServers needs to acquire a socket to call isMaster, but the socket acquisition needs to know the server topology which isn't known yet. See globalsign/mgo#120 issue for a detailed breakdown. This replicates the issue by setting up a mongo "server" which closes sockets as soon as they're opened; about 20% of the time, this will trigger the deadlock because the acquired socket for ismaster() dies and needs to be reacquired. --- cluster_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index 8945e0962..a0a197048 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) { wg.Wait() } +func (s *S) TestNoDeadlockOnClose(c *C) { + if *fast { + // Unfortunately I seem to need quite a high dial timeout to get this to work + // on my machine. + c.Skip("-fast") + } + + var shouldStop int32 + atomic.StoreInt32(&shouldStop, 0) + + listener, err := net.Listen("tcp4", "127.0.0.1:") + c.Check(err, Equals, nil) + + go func() { + for atomic.LoadInt32(&shouldStop) == 0 { + sock, err := listener.Accept() + if err != nil { + // Probs just closed + continue + } + sock.Close() + } + }() + defer func() { + atomic.StoreInt32(&shouldStop, 1) + listener.Close() + }() + + session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second) + // If execution reaches here, the deadlock did not happen and all is OK + if session != nil { + session.Close() + } +} + func (s *S) TestSelectServers(c *C) { if !s.versionAtLeast(2, 2) { c.Skip("read preferences introduced in 2.2") From fc9aa45962e857ceb0d4dc4ad862d9435a5a07b4 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 10 Mar 2018 18:27:53 +1030 Subject: [PATCH 2/3] Propose a fix for globalsign/mgo#120 As discussed in the issue globalsign/mgo#120, isMaster() can cause a deadlock with the topology scanner if the connection it makes dies before running the command; mgo automagically attempts to make another socket in acquireSocket, but this can't work without topology. This commit forces isMaster() to actually run on the intended socket. --- cluster.go | 2 +- session.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index 204f507bd..9434647a7 100644 --- a/cluster.go +++ b/cluster.go @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.Run(cmd, result) + err := session.RunOnSocket(socket, cmd, result) session.Close() return err } diff --git a/session.go b/session.go index d1c88420e..0bbd51621 100644 --- a/session.go +++ b/session.go @@ -848,6 +848,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } +// RunOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (db *Database) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + socket.Acquire() + defer socket.Release() + return db.run(socket, cmd, result) +} + // Credential holds details to authenticate with a MongoDB server. type Credential struct { // Username and Password hold the basic details for authentication. @@ -2312,6 +2321,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } +// RunOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (s *Session) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").RunOnSocket(socket, cmd, result) +} + // SelectServers restricts communication to servers configured with the // given tags. For example, the following statement restricts servers // used for reading operations to those with both tag "disk" set to From 6474164de29f30136ed035c4f3507cf5a0b3e3c2 Mon Sep 17 00:00:00 2001 From: Damir Vandic Date: Wed, 21 Mar 2018 11:32:24 +0100 Subject: [PATCH 3/3] Make run on socket helper methods private --- cluster.go | 2 +- session.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cluster.go b/cluster.go index 9434647a7..91a4e9ec2 100644 --- a/cluster.go +++ b/cluster.go @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.RunOnSocket(socket, cmd, result) + err := session.runOnSocket(socket, cmd, result) session.Close() return err } diff --git a/session.go b/session.go index 0bbd51621..3a27caf30 100644 --- a/session.go +++ b/session.go @@ -848,10 +848,10 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } -// RunOnSocket does the same as Run, but guarantees that your command will be run +// runOnSocket does the same as Run, but guarantees that your command will be run // on the provided socket instance; if it's unhealthy, you will receive the error // from it. -func (db *Database) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { +func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { socket.Acquire() defer socket.Release() return db.run(socket, cmd, result) @@ -2321,11 +2321,11 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } -// RunOnSocket does the same as Run, but guarantees that your command will be run +// runOnSocket does the same as Run, but guarantees that your command will be run // on the provided socket instance; if it's unhealthy, you will receive the error // from it. -func (s *Session) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { - return s.DB("admin").RunOnSocket(socket, cmd, result) +func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").runOnSocket(socket, cmd, result) } // SelectServers restricts communication to servers configured with the