diff --git a/changes.txt b/changes.txt index 525a6336fc..51826c3b0a 100755 --- a/changes.txt +++ b/changes.txt @@ -1,5 +1,8 @@ CHANGES +2014-08-06 + - Add connection pool and connection strategy + 2014-07-26 - Release v1.3.0.0 - Prepare Elastica Release v1.3.0.0 diff --git a/lib/Elastica/Client.php b/lib/Elastica/Client.php index a582118041..20ba8632fe 100644 --- a/lib/Elastica/Client.php +++ b/lib/Elastica/Client.php @@ -4,8 +4,6 @@ use Elastica\Bulk; use Elastica\Bulk\Action; -use Elastica\Exception\ResponseException; -use Elastica\Exception\ClientException; use Elastica\Exception\ConnectionException; use Elastica\Exception\InvalidException; use Elastica\Exception\RuntimeException; @@ -43,11 +41,6 @@ class Client 'retryOnConflict' => 0, ); - /** - * @var \Elastica\Connection[] List of connections - */ - protected $_connections = array(); - /** * @var callback */ @@ -67,6 +60,11 @@ class Client * @var LoggerInterface */ protected $_logger = null; + /** + * + * @var Connection\ConnectionPool + */ + protected $_connectionPool = null; /** * Creates a new Elastica client @@ -86,22 +84,34 @@ public function __construct(array $config = array(), $callback = null) */ protected function _initConnections() { - $connections = $this->getConfig('connections'); - - foreach ($connections as $connection) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($connection)); + $connections = array(); + + foreach ($this->getConfig('connections') as $connection) { + $connections[] = Connection::create($this->_prepareConnectionParams($connection)); } if (isset($this->_config['servers'])) { foreach ($this->getConfig('servers') as $server) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($server)); + $connections[] = Connection::create($this->_prepareConnectionParams($server)); } } // If no connections set, create default connection - if (empty($this->_connections)) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); + if (empty($connections)) { + $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); } + + if (!isset($this->_config['connectionStrategy'])) { + if ($this->getConfig('roundRobin') === true) { + $this->setConfigValue('connectionStrategy', 'RoundRobin'); + } else { + $this->setConfigValue('connectionStrategy', 'Simple'); + } + } + + $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy')); + + $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback); } /** @@ -439,7 +449,7 @@ public function getCluster() */ public function addConnection(Connection $connection) { - $this->_connections[] = $connection; + $this->_connectionPool->addConnection($connection); return $this; } @@ -451,15 +461,7 @@ public function addConnection(Connection $connection) */ public function hasConnection() { - foreach ($this->_connections as $connection) - { - if ($connection->isEnabled()) - { - return true; - } - } - - return false; + return $this->_connectionPool->hasConnection(); } /** @@ -468,20 +470,7 @@ public function hasConnection() */ public function getConnection() { - $enabledConnection = null; - - foreach ($this->_connections as $connection) { - if ($connection->isEnabled()) { - $enabledConnection = $connection; - break; - } - } - - if (empty($enabledConnection)) { - throw new ClientException('No enabled connection'); - } - - return $enabledConnection; + return $this->_connectionPool->getConnection(); } /** @@ -489,16 +478,25 @@ public function getConnection() */ public function getConnections() { - return $this->_connections; + return $this->_connectionPool->getConnections(); + } + + /** + * + * @return \Connection\Strategy\StrategyInterface + */ + public function getConnectionStrategy() + { + return $this->_connectionPool->getStrategy(); } /** - * @param \Elastica\Connection[] $connections + * @param array|\Elastica\Connection[] $connections * @return \Elastica\Client */ public function setConnections(array $connections) { - $this->_connections = $connections; + $this->_connectionPool->setConnections($connections); return $this; } @@ -597,12 +595,7 @@ public function request($path, $method = Request::GET, $data = array(), array $q return $response; } catch (ConnectionException $e) { - $connection->setEnabled(false); - - // Calls callback with connection as param to make it possible to persist invalid connections - if ($this->_callback) { - call_user_func($this->_callback, $connection, $e, $this); - } + $this->_connectionPool->onFail($connection, $e, $this); // In case there is no valid connection left, throw exception which caused the disabling of the connection. if (!$this->hasConnection()) diff --git a/lib/Elastica/Connection/ConnectionPool.php b/lib/Elastica/Connection/ConnectionPool.php new file mode 100644 index 0000000000..66b2adaa22 --- /dev/null +++ b/lib/Elastica/Connection/ConnectionPool.php @@ -0,0 +1,121 @@ +_connections = $connections; + + $this->_strategy = $strategy; + + $this->_callback = $callback; + } + + /** + * @param \Elastica\Connection $connection + */ + public function addConnection(Connection $connection) + { + $this->_connections[] = $connection; + } + + /** + * @param array|\Elastica\Connection[] $connections + */ + public function setConnections(array $connections) + { + $this->_connections = $connections; + } + + /** + * @return boolean + */ + public function hasConnection() + { + foreach ($this->_connections as $connection) { + if ($connection->isEnabled()) { + return true; + } + } + + return false; + } + + /** + * @return array + */ + public function getConnections() + { + return $this->_connections; + } + + /** + * @return \Elastica\Connection + * @throws \Elastica\Exception\ClientException + */ + public function getConnection() + { + return $this->_strategy->getConnection($this->getConnections()); + } + + /** + * @param \Elastica\Connection $connection + * @param \Exception $e + * @param Client $client + */ + public function onFail(Connection $connection, Exception $e, Client $client) + { + $connection->setEnabled(false); + + if ($this->_callback) { + call_user_func($this->_callback, $connection, $e, $client); + } + } + + /** + * + * @return \Elastica\Connection\Strategy\StrategyInterface + */ + public function getStrategy() + { + return $this->_strategy; + } +} diff --git a/lib/Elastica/Connection/Strategy/CallbackStrategy.php b/lib/Elastica/Connection/Strategy/CallbackStrategy.php new file mode 100644 index 0000000000..7ea7cf9eae --- /dev/null +++ b/lib/Elastica/Connection/Strategy/CallbackStrategy.php @@ -0,0 +1,49 @@ +_callback = $callback; + } + + /** + * @param array|\Elastica\Connection[] $connections + * @return \Elastica\Connection + */ + public function getConnection($connections) + { + return $this->_callback->__invoke($connections); + } + + /** + * @return boolean + */ + public static function isValid($callback) + { + return is_object($callback) && ($callback instanceof \Closure); + } +} diff --git a/lib/Elastica/Connection/Strategy/RoundRobin.php b/lib/Elastica/Connection/Strategy/RoundRobin.php new file mode 100644 index 0000000000..9cbf8c86e9 --- /dev/null +++ b/lib/Elastica/Connection/Strategy/RoundRobin.php @@ -0,0 +1,24 @@ +isEnabled()) { + return $connection; + } + } + + throw new ClientException('No enabled connection'); + } +} diff --git a/lib/Elastica/Connection/Strategy/StrategyFactory.php b/lib/Elastica/Connection/Strategy/StrategyFactory.php new file mode 100644 index 0000000000..fbdb44e664 --- /dev/null +++ b/lib/Elastica/Connection/Strategy/StrategyFactory.php @@ -0,0 +1,41 @@ +createPool(); + + $this->assertEquals($this->getConnections(), $pool->getConnections()); + } + + public function testSetConnections() + { + $pool = $this->createPool(); + + $connections = $this->getConnections(5); + + $pool->setConnections($connections); + + $this->assertEquals($connections, $pool->getConnections()); + } + + public function testHasConnection() + { + $pool = $this->createPool(); + + $this->assertTrue($pool->hasConnection()); + } + + public function testFailHasConnections() + { + $pool = $this->createPool(); + + $pool->setConnections(array()); + + $this->assertFalse($pool->hasConnection()); + } + + public function testGetConnection() + { + $pool = $this->createPool(); + + $this->assertTrue($pool->getConnection() instanceof \Elastica\Connection); + } + + protected function getConnections($quantity = 1) + { + $params = array(); + $connections = array(); + + for ($i = 0; $i<$quantity; $i++) { + $connections[] = new \Elastica\Connection($params); + } + + return $connections; + } + + protected function createPool() + { + $connections = $this->getConnections(); + $strategy = \Elastica\Connection\Strategy\StrategyFactory::create('Simple'); + + $pool = new \Elastica\Connection\ConnectionPool($connections, $strategy); + + return $pool; + } +} diff --git a/test/lib/Elastica/Test/Connection/Strategy/CallbackStrategyTest.php b/test/lib/Elastica/Test/Connection/Strategy/CallbackStrategyTest.php new file mode 100644 index 0000000000..7f5aa84d49 --- /dev/null +++ b/test/lib/Elastica/Test/Connection/Strategy/CallbackStrategyTest.php @@ -0,0 +1,69 @@ +getConnection(array()); + + $this->assertEquals(1, $count); + } + + public function testIsValid() + { + $callback = function(){}; + + $isValid = CallbackStrategy::isValid($callback); + + $this->assertTrue($isValid); + } + + public function testFailIsValid() + { + $callback = new \stdClass(); + + $isValid = CallbackStrategy::isValid($callback); + + $this->assertFalse($isValid); + } + + public function testConnection() + { + $count = 0; + + $config = array('connectionStrategy' => function ($connections) use(&$count) { + ++$count; + return current($connections); + }); + + $client = new \Elastica\Client($config); + $resonse = $client->request('/_aliases'); + + $this->assertEquals(1, $count); + + $this->assertTrue($resonse->isOk()); + + $strategy = $client->getConnectionStrategy(); + + $condition = ($strategy instanceof CallbackStrategy); + + $this->assertTrue($condition); + } +} diff --git a/test/lib/Elastica/Test/Connection/Strategy/EmptyStrategy.php b/test/lib/Elastica/Test/Connection/Strategy/EmptyStrategy.php new file mode 100644 index 0000000000..85acf54701 --- /dev/null +++ b/test/lib/Elastica/Test/Connection/Strategy/EmptyStrategy.php @@ -0,0 +1,18 @@ + 'RoundRobin'); + $client = new Client($config); + $resonse = $client->request('/_aliases'); + /* @var $resonse Response */ + + $this->_checkResponse($resonse); + + $this->_checkStrategy($client); + } + + public function testOldStrategySetted() + { + $config = array('roundRobin' => true); + $client = new Client($config); + + $this->_checkStrategy($client); + } + + /** + * @expectedException \Elastica\Exception\ConnectionException + */ + public function testFailConnection() + { + $config = array('connectionStrategy' => 'RoundRobin', 'host' => '255.255.255.0'); + $client = new Client($config); + + $this->_checkStrategy($client); + + $client->request('/_aliases'); + + } + + public function testWithOneFailConnection() + { + $connections = array( + new \Elastica\Connection(array('host' => '255.255.255.0')), + new \Elastica\Connection(array('host' => 'localhost')), + ); + + $count = 0; + $callback = function($connection, $exception, $client) use(&$count) { + ++$count; + }; + + $client = new Client(array('connectionStrategy' => 'RoundRobin'), $callback); + $client->setConnections($connections); + + $resonse = $client->request('/_aliases'); + /* @var $resonse Response */ + + $this->_checkResponse($resonse); + + $this->_checkStrategy($client); + + + $this->assertLessThan(count($connections), $count); + } + + public function testWithNoValidConnection() + { + $connections = array( + new \Elastica\Connection(array('host' => '255.255.255.0', 'timeout' => 2)), + new \Elastica\Connection(array('host' => '45.45.45.45', 'port' => '80', 'timeout' => 2)), + new \Elastica\Connection(array('host' => '10.123.213.123', 'timeout' => 2)), + ); + + $count = 0; + $client = new Client(array('roundRobin' => true), function() use (&$count) { + ++$count; + }); + + $client->setConnections($connections); + + try { + $client->request('/_aliases'); + $this->fail('Should throw exception as no connection valid'); + } catch (\Elastica\Exception\ConnectionException $e) { + $this->assertEquals(count($connections), $count); + $this->_checkStrategy($client); + } + + } + + protected function _checkStrategy($client) + { + $strategy = $client->getConnectionStrategy(); + + $condition = ($strategy instanceof RoundRobin); + + $this->assertTrue($condition); + } + + protected function _checkResponse($resonse) + { + $this->assertTrue($resonse->isOk()); + } + +} diff --git a/test/lib/Elastica/Test/Connection/Strategy/SimpleTest.php b/test/lib/Elastica/Test/Connection/Strategy/SimpleTest.php new file mode 100644 index 0000000000..bd22f82faa --- /dev/null +++ b/test/lib/Elastica/Test/Connection/Strategy/SimpleTest.php @@ -0,0 +1,104 @@ +request('/_aliases'); + /* @var $resonse \Elastica\Response */ + + $this->_checkResponse($resonse); + + $this->_checkStrategy($client); + } + + /** + * @expectedException \Elastica\Exception\ConnectionException + */ + public function testFailConnection() + { + $config = array('host' => '255.255.255.0'); + $client = new Client($config); + + $this->_checkStrategy($client); + + $client->request('/_aliases'); + + } + + public function testWithOneFailConnection() + { + $connections = array( + new \Elastica\Connection(array('host' => '255.255.255.0')), + new \Elastica\Connection(array('host' => 'localhost')), + ); + + $count = 0; + $callback = function($connection, $exception, $client) use(&$count) { + ++$count; + }; + + $client = new Client(array(), $callback); + $client->setConnections($connections); + + $resonse = $client->request('/_aliases'); + /* @var $resonse Response */ + + $this->_checkResponse($resonse); + + $this->_checkStrategy($client); + + + $this->assertLessThan(count($connections), $count); + } + + public function testWithNoValidConnection() + { + $connections = array( + new \Elastica\Connection(array('host' => '255.255.255.0', 'timeout' => 2)), + new \Elastica\Connection(array('host' => '45.45.45.45', 'port' => '80', 'timeout' => 2)), + new \Elastica\Connection(array('host' => '10.123.213.123', 'timeout' => 2)), + ); + + $count = 0; + $client = new Client(array(), function() use (&$count) { + ++$count; + }); + + $client->setConnections($connections); + + try { + $client->request('/_aliases'); + $this->fail('Should throw exception as no connection valid'); + } catch (\Elastica\Exception\ConnectionException $e) { + $this->assertEquals(count($connections), $count); + } + + } + + protected function _checkStrategy($client) + { + $strategy = $client->getConnectionStrategy(); + + $condition = ($strategy instanceof Simple); + + $this->assertTrue($condition); + } + + protected function _checkResponse($resonse) + { + $this->assertTrue($resonse->isOk()); + } +} diff --git a/test/lib/Elastica/Test/Connection/Strategy/StrategyFactoryTest.php b/test/lib/Elastica/Test/Connection/Strategy/StrategyFactoryTest.php new file mode 100644 index 0000000000..bab7fab8d4 --- /dev/null +++ b/test/lib/Elastica/Test/Connection/Strategy/StrategyFactoryTest.php @@ -0,0 +1,72 @@ +assertTrue($condition); + } + + public function testCreateByName() + { + $strategyName = 'Simple'; + + $strategy = StrategyFactory::create($strategyName); + + $this->assertTrue($strategy instanceof Simple); + } + + public function testCreateByClass() + { + $strategy = new EmptyStrategy(); + + $this->assertEquals($strategy, StrategyFactory::create($strategy)); + } + + public function testCreateByClassName() + { + $strategyName = '\\Elastica\Test\Connection\Strategy\\EmptyStrategy'; + + $strategy = StrategyFactory::create($strategyName); + + $condition = $strategy instanceof $strategyName; + + $this->assertTrue($condition); + } + /** + * @expectedException \InvalidArgumentException + */ + public function testFailCreate() + { + $strategy = new \stdClass(); + + StrategyFactory::create($strategy); + } +}