Skip to content

Commit

Permalink
Merge pull request ruflin#661 from chabior/master
Browse files Browse the repository at this point in the history
Connection pool & round robin strategy
  • Loading branch information
ruflin committed Aug 12, 2014
2 parents 92155a3 + 1a810c4 commit 07214ca
Show file tree
Hide file tree
Showing 14 changed files with 789 additions and 47 deletions.
3 changes: 3 additions & 0 deletions changes.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
CHANGES

2014-08-06
- Add connection pool and connection strategy

2014-07-26
- Release v1.3.0.0
- Prepare Elastica Release v1.3.0.0
Expand Down
87 changes: 40 additions & 47 deletions lib/Elastica/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

use Elastica\Bulk;
use Elastica\Bulk\Action;
use Elastica\Exception\ResponseException;
use Elastica\Exception\ClientException;
use Elastica\Exception\ConnectionException;
use Elastica\Exception\InvalidException;
use Elastica\Exception\RuntimeException;
Expand Down Expand Up @@ -43,11 +41,6 @@ class Client
'retryOnConflict' => 0,
);

/**
* @var \Elastica\Connection[] List of connections
*/
protected $_connections = array();

/**
* @var callback
*/
Expand All @@ -67,6 +60,11 @@ class Client
* @var LoggerInterface
*/
protected $_logger = null;
/**
*
* @var Connection\ConnectionPool
*/
protected $_connectionPool = null;

/**
* Creates a new Elastica client
Expand All @@ -86,22 +84,34 @@ public function __construct(array $config = array(), $callback = null)
*/
protected function _initConnections()
{
$connections = $this->getConfig('connections');

foreach ($connections as $connection) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($connection));
$connections = array();
foreach ($this->getConfig('connections') as $connection) {
$connections[] = Connection::create($this->_prepareConnectionParams($connection));
}

if (isset($this->_config['servers'])) {
foreach ($this->getConfig('servers') as $server) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($server));
$connections[] = Connection::create($this->_prepareConnectionParams($server));
}
}

// If no connections set, create default connection
if (empty($this->_connections)) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
if (empty($connections)) {
$connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
}

if (!isset($this->_config['connectionStrategy'])) {
if ($this->getConfig('roundRobin') === true) {
$this->setConfigValue('connectionStrategy', 'RoundRobin');
} else {
$this->setConfigValue('connectionStrategy', 'Simple');
}
}

$strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));

$this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
}

/**
Expand Down Expand Up @@ -439,7 +449,7 @@ public function getCluster()
*/
public function addConnection(Connection $connection)
{
$this->_connections[] = $connection;
$this->_connectionPool->addConnection($connection);

return $this;
}
Expand All @@ -451,15 +461,7 @@ public function addConnection(Connection $connection)
*/
public function hasConnection()
{
foreach ($this->_connections as $connection)
{
if ($connection->isEnabled())
{
return true;
}
}

return false;
return $this->_connectionPool->hasConnection();
}

/**
Expand All @@ -468,37 +470,33 @@ public function hasConnection()
*/
public function getConnection()
{
$enabledConnection = null;

foreach ($this->_connections as $connection) {
if ($connection->isEnabled()) {
$enabledConnection = $connection;
break;
}
}

if (empty($enabledConnection)) {
throw new ClientException('No enabled connection');
}

return $enabledConnection;
return $this->_connectionPool->getConnection();
}

/**
* @return \Elastica\Connection[]
*/
public function getConnections()
{
return $this->_connections;
return $this->_connectionPool->getConnections();
}

/**
*
* @return \Connection\Strategy\StrategyInterface
*/
public function getConnectionStrategy()
{
return $this->_connectionPool->getStrategy();
}

/**
* @param \Elastica\Connection[] $connections
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Client
*/
public function setConnections(array $connections)
{
$this->_connections = $connections;
$this->_connectionPool->setConnections($connections);

return $this;
}
Expand Down Expand Up @@ -597,12 +595,7 @@ public function request($path, $method = Request::GET, $data = array(), array $q
return $response;

} catch (ConnectionException $e) {
$connection->setEnabled(false);

// Calls callback with connection as param to make it possible to persist invalid connections
if ($this->_callback) {
call_user_func($this->_callback, $connection, $e, $this);
}
$this->_connectionPool->onFail($connection, $e, $this);

// In case there is no valid connection left, throw exception which caused the disabling of the connection.
if (!$this->hasConnection())
Expand Down
121 changes: 121 additions & 0 deletions lib/Elastica/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?php

namespace Elastica\Connection;

use Elastica\Client;
use Elastica\Connection;
use Elastica\Connection\Strategy\StrategyInterface;
use Exception;

/**
* Description of ConnectionPool
*
* @author chabior
*/
class ConnectionPool
{
/**
* Connections array
*
* @var array|\Elastica\Connection[]
*/
protected $_connections;

/**
* Strategy for connection
*
* @var \Elastica\Connection\Strategy\StrategyInterface
*/
protected $_strategy;

/**
* Callback function called on connection fail
*
* @var callback
*/
protected $_callback;

/**
* @param array $connections
* @param \Elastica\Connection\Strategy\StrategyInterface $strategy
* @param callback $callback
*/
public function __construct(array $connections, StrategyInterface $strategy, $callback = null)
{
$this->_connections = $connections;

$this->_strategy = $strategy;

$this->_callback = $callback;
}

/**
* @param \Elastica\Connection $connection
*/
public function addConnection(Connection $connection)
{
$this->_connections[] = $connection;
}

/**
* @param array|\Elastica\Connection[] $connections
*/
public function setConnections(array $connections)
{
$this->_connections = $connections;
}

/**
* @return boolean
*/
public function hasConnection()
{
foreach ($this->_connections as $connection) {
if ($connection->isEnabled()) {
return true;
}
}

return false;
}

/**
* @return array
*/
public function getConnections()
{
return $this->_connections;
}

/**
* @return \Elastica\Connection
* @throws \Elastica\Exception\ClientException
*/
public function getConnection()
{
return $this->_strategy->getConnection($this->getConnections());
}

/**
* @param \Elastica\Connection $connection
* @param \Exception $e
* @param Client $client
*/
public function onFail(Connection $connection, Exception $e, Client $client)
{
$connection->setEnabled(false);

if ($this->_callback) {
call_user_func($this->_callback, $connection, $e, $client);
}
}

/**
*
* @return \Elastica\Connection\Strategy\StrategyInterface
*/
public function getStrategy()
{
return $this->_strategy;
}
}
49 changes: 49 additions & 0 deletions lib/Elastica/Connection/Strategy/CallbackStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Elastica\Connection\Strategy;

use Elastica\Exception\InvalidException;

/**
* Description of CallbackStrategy
*
* @author chabior
*/
class CallbackStrategy implements StrategyInterface
{

/**
* @var Closure
*/
protected $_callback;

/**
* @param Closure $callback
* @throws \Elastica\Exception\InvalidException
*/
public function __construct($callback)
{
if (!self::isValid($callback)) {
throw new InvalidException(sprintf('Callback should be a Closure, %s given!', gettype($callback)));
}

$this->_callback = $callback;
}

/**
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Connection
*/
public function getConnection($connections)
{
return $this->_callback->__invoke($connections);
}

/**
* @return boolean
*/
public static function isValid($callback)
{
return is_object($callback) && ($callback instanceof \Closure);
}
}
24 changes: 24 additions & 0 deletions lib/Elastica/Connection/Strategy/RoundRobin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace Elastica\Connection\Strategy;

/**
* Description of RoundRobin
*
* @author chabior
*/
class RoundRobin extends Simple
{

/**
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Connection
* @throws \Elastica\Exception\ClientException
*/
public function getConnection($connections)
{
shuffle($connections);

return parent::getConnection($connections);
}
}
Loading

0 comments on commit 07214ca

Please sign in to comment.