Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection pool & round robin strategy #661

Merged
merged 5 commits into from
Aug 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
CHANGES

2014-08-06
- Add connection pool and connection strategy

2014-07-26
- Release v1.3.0.0
- Prepare Elastica Release v1.3.0.0
Expand Down
87 changes: 40 additions & 47 deletions lib/Elastica/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

use Elastica\Bulk;
use Elastica\Bulk\Action;
use Elastica\Exception\ResponseException;
use Elastica\Exception\ClientException;
use Elastica\Exception\ConnectionException;
use Elastica\Exception\InvalidException;
use Elastica\Exception\RuntimeException;
Expand Down Expand Up @@ -43,11 +41,6 @@ class Client
'retryOnConflict' => 0,
);

/**
* @var \Elastica\Connection[] List of connections
*/
protected $_connections = array();

/**
* @var callback
*/
Expand All @@ -67,6 +60,11 @@ class Client
* @var LoggerInterface
*/
protected $_logger = null;
/**
*
* @var Connection\ConnectionPool
*/
protected $_connectionPool = null;

/**
* Creates a new Elastica client
Expand All @@ -86,22 +84,34 @@ public function __construct(array $config = array(), $callback = null)
*/
protected function _initConnections()
{
$connections = $this->getConfig('connections');

foreach ($connections as $connection) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($connection));
$connections = array();
foreach ($this->getConfig('connections') as $connection) {
$connections[] = Connection::create($this->_prepareConnectionParams($connection));
}

if (isset($this->_config['servers'])) {
foreach ($this->getConfig('servers') as $server) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($server));
$connections[] = Connection::create($this->_prepareConnectionParams($server));
}
}

// If no connections set, create default connection
if (empty($this->_connections)) {
$this->_connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
if (empty($connections)) {
$connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
}

if (!isset($this->_config['connectionStrategy'])) {
if ($this->getConfig('roundRobin') === true) {
$this->setConfigValue('connectionStrategy', 'RoundRobin');
} else {
$this->setConfigValue('connectionStrategy', 'Simple');
}
}

$strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));

$this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
}

/**
Expand Down Expand Up @@ -439,7 +449,7 @@ public function getCluster()
*/
public function addConnection(Connection $connection)
{
$this->_connections[] = $connection;
$this->_connectionPool->addConnection($connection);

return $this;
}
Expand All @@ -451,15 +461,7 @@ public function addConnection(Connection $connection)
*/
public function hasConnection()
{
foreach ($this->_connections as $connection)
{
if ($connection->isEnabled())
{
return true;
}
}

return false;
return $this->_connectionPool->hasConnection();
}

/**
Expand All @@ -468,37 +470,33 @@ public function hasConnection()
*/
public function getConnection()
{
$enabledConnection = null;

foreach ($this->_connections as $connection) {
if ($connection->isEnabled()) {
$enabledConnection = $connection;
break;
}
}

if (empty($enabledConnection)) {
throw new ClientException('No enabled connection');
}

return $enabledConnection;
return $this->_connectionPool->getConnection();
}

/**
* @return \Elastica\Connection[]
*/
public function getConnections()
{
return $this->_connections;
return $this->_connectionPool->getConnections();
}

/**
*
* @return \Connection\Strategy\StrategyInterface
*/
public function getConnectionStrategy()
{
return $this->_connectionPool->getStrategy();
}

/**
* @param \Elastica\Connection[] $connections
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Client
*/
public function setConnections(array $connections)
{
$this->_connections = $connections;
$this->_connectionPool->setConnections($connections);

return $this;
}
Expand Down Expand Up @@ -597,12 +595,7 @@ public function request($path, $method = Request::GET, $data = array(), array $q
return $response;

} catch (ConnectionException $e) {
$connection->setEnabled(false);

// Calls callback with connection as param to make it possible to persist invalid connections
if ($this->_callback) {
call_user_func($this->_callback, $connection, $e, $this);
}
$this->_connectionPool->onFail($connection, $e, $this);

// In case there is no valid connection left, throw exception which caused the disabling of the connection.
if (!$this->hasConnection())
Expand Down
121 changes: 121 additions & 0 deletions lib/Elastica/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?php

namespace Elastica\Connection;

use Elastica\Client;
use Elastica\Connection;
use Elastica\Connection\Strategy\StrategyInterface;
use Exception;

/**
* Description of ConnectionPool
*
* @author chabior
*/
class ConnectionPool
{
/**
* Connections array
*
* @var array|\Elastica\Connection[]
*/
protected $_connections;

/**
* Strategy for connection
*
* @var \Elastica\Connection\Strategy\StrategyInterface
*/
protected $_strategy;

/**
* Callback function called on connection fail
*
* @var callback
*/
protected $_callback;

/**
* @param array $connections
* @param \Elastica\Connection\Strategy\StrategyInterface $strategy
* @param callback $callback
*/
public function __construct(array $connections, StrategyInterface $strategy, $callback = null)
{
$this->_connections = $connections;

$this->_strategy = $strategy;

$this->_callback = $callback;
}

/**
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you insert a newline on top of all /** (also other files)?

* @param \Elastica\Connection $connection
*/
public function addConnection(Connection $connection)
{
$this->_connections[] = $connection;
}

/**
* @param array|\Elastica\Connection[] $connections
*/
public function setConnections(array $connections)
{
$this->_connections = $connections;
}

/**
* @return boolean
*/
public function hasConnection()
{
foreach ($this->_connections as $connection) {
if ($connection->isEnabled()) {
return true;
}
}

return false;
}

/**
* @return array
*/
public function getConnections()
{
return $this->_connections;
}

/**
* @return \Elastica\Connection
* @throws \Elastica\Exception\ClientException
*/
public function getConnection()
{
return $this->_strategy->getConnection($this->getConnections());
}

/**
* @param \Elastica\Connection $connection
* @param \Exception $e
* @param Client $client
*/
public function onFail(Connection $connection, Exception $e, Client $client)
{
$connection->setEnabled(false);

if ($this->_callback) {
call_user_func($this->_callback, $connection, $e, $client);
}
}

/**
*
* @return \Elastica\Connection\Strategy\StrategyInterface
*/
public function getStrategy()
{
return $this->_strategy;
}
}
49 changes: 49 additions & 0 deletions lib/Elastica/Connection/Strategy/CallbackStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Elastica\Connection\Strategy;

use Elastica\Exception\InvalidException;

/**
* Description of CallbackStrategy
*
* @author chabior
*/
class CallbackStrategy implements StrategyInterface
{

/**
* @var Closure
*/
protected $_callback;

/**
* @param Closure $callback
* @throws \Elastica\Exception\InvalidException
*/
public function __construct($callback)
{
if (!self::isValid($callback)) {
throw new InvalidException(sprintf('Callback should be a Closure, %s given!', gettype($callback)));
}

$this->_callback = $callback;
}

/**
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Connection
*/
public function getConnection($connections)
{
return $this->_callback->__invoke($connections);
}

/**
* @return boolean
*/
public static function isValid($callback)
{
return is_object($callback) && ($callback instanceof \Closure);
}
}
24 changes: 24 additions & 0 deletions lib/Elastica/Connection/Strategy/RoundRobin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace Elastica\Connection\Strategy;

/**
* Description of RoundRobin
*
* @author chabior
*/
class RoundRobin extends Simple
{

/**
* @param array|\Elastica\Connection[] $connections
* @return \Elastica\Connection
* @throws \Elastica\Exception\ClientException
*/
public function getConnection($connections)
{
shuffle($connections);

return parent::getConnection($connections);
}
}
Loading