From 6f15000309093b54f7f59f07af297f576fd3a498 Mon Sep 17 00:00:00 2001 From: "Manuel F. Lara" Date: Wed, 30 Sep 2015 15:26:05 +0200 Subject: [PATCH] Parallel cURL implementation If a new "num_threads" > 1 option is passed using curl as consumer, as many parallel cURL requests will be executed in parallel using curl_multi_exec --- lib/ConsumerStrategies/AbstractConsumer.php | 8 ++ lib/ConsumerStrategies/CurlConsumer.php | 82 ++++++++++++++------ lib/Producers/MixpanelBaseProducer.php | 4 +- test/ConsumerStrategies/CurlConsumerTest.php | 2 + 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/lib/ConsumerStrategies/AbstractConsumer.php b/lib/ConsumerStrategies/AbstractConsumer.php index 7d2a67e..3f701fd 100644 --- a/lib/ConsumerStrategies/AbstractConsumer.php +++ b/lib/ConsumerStrategies/AbstractConsumer.php @@ -48,6 +48,14 @@ protected function _handleError($code, $msg) { } } + /** + * Number of requests/batches that will be processed in parallel. + * @return int + */ + public function getNumParallelRequests() { + return 1; + } + /** * Persist a batch of messages in whatever way the implementer sees fit * @param array $batch an array of messages to consume diff --git a/lib/ConsumerStrategies/CurlConsumer.php b/lib/ConsumerStrategies/CurlConsumer.php index b8b08f9..f00696f 100644 --- a/lib/ConsumerStrategies/CurlConsumer.php +++ b/lib/ConsumerStrategies/CurlConsumer.php @@ -42,6 +42,12 @@ class ConsumerStrategies_CurlConsumer extends ConsumerStrategies_AbstractConsume protected $_fork = null; + /** + * @var int number of cURL requests to run in parallel. 1 by default + */ + protected $_num_threads; + + /** * Creates a new CurlConsumer and assigns properties from the $options array * @param array $options @@ -56,6 +62,7 @@ function __construct($options) { $this->_timeout = array_key_exists('timeout', $options) ? $options['timeout'] : 30; $this->_protocol = array_key_exists('use_ssl', $options) && $options['use_ssl'] == true ? "https" : "http"; $this->_fork = array_key_exists('fork', $options) ? ($options['fork'] == true) : false; + $this->_num_threads = array_key_exists('num_threads', $options) ? max(1, intval($options['num_threads'])) : 1; // ensure the environment is workable for the given settings if ($this->_fork == true) { @@ -88,7 +95,7 @@ public function persist($batch) { if ($this->_fork) { return $this->_execute_forked($url, $data); } else { - return $this->_execute($url, $data); + return $this->_execute($url, $batch); } } else { return true; @@ -102,35 +109,57 @@ public function persist($batch) { * @param $data * @return bool */ - protected function _execute($url, $data) { + protected function _execute($url, $batch) { if ($this->_debug()) { $this->_log("Making blocking cURL call to $url"); } - $ch = curl_init(); - curl_setopt($ch, CURLOPT_URL, $url); - curl_setopt($ch, CURLOPT_HEADER, 0); - curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connect_timeout); - curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout); - curl_setopt($ch, CURLOPT_POST, 1); - curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); - curl_setopt($ch, CURLOPT_POSTFIELDS, $data); - $response = curl_exec($ch); - if (false === $response) { - $curl_error = curl_error($ch); - $curl_errno = curl_errno($ch); - curl_close($ch); - $this->_handleError($curl_errno, $curl_error); - return false; - } else { - curl_close($ch); - if (trim($response) == "1") { - return true; - } else { + $mh = curl_multi_init(); + $chs = array(); + + $batch_size = ceil(count($batch) / $this->_num_threads); + for ($i=0; $i<$this->_num_threads && !empty($batch); $i++) { + $ch = curl_init(); + $chs[] = $ch; + $data = "data=" . $this->_encode(array_splice($batch, 0, $batch_size)); + curl_setopt($ch, CURLOPT_URL, $url); + curl_setopt($ch, CURLOPT_HEADER, 0); + curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connect_timeout); + curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout); + curl_setopt($ch, CURLOPT_POST, 1); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); + curl_setopt($ch, CURLOPT_POSTFIELDS, $data); + curl_multi_add_handle($mh,$ch); + } + + do { + curl_multi_exec($mh, $running); + curl_multi_select($mh); + } while ($running > 0); + + $info = curl_multi_info_read($mh); + + $error = false; + foreach ($chs as $ch) { + $response = curl_multi_getcontent($ch); + if (false === $response) { + $this->_handleError(curl_errno($ch), curl_error($ch)); + $error = true; + } + elseif ("1" != trim($response)) { $this->_handleError(0, $response); - return false; + $error = true; } + curl_multi_remove_handle($mh, $ch); } + + if (CURLE_OK != $info['result']) { + $this->_handleError($info['result'], "cURL error with code=".$info['result']); + $error = true; + } + + curl_multi_close($mh); + return !$error; } @@ -218,4 +247,11 @@ public function getTimeout() } + /** + * Number of requests/batches that will be processed in parallel using curl_multi_exec. + * @return int + */ + public function getNumThreads() { + return $this->_num_threads; + } } \ No newline at end of file diff --git a/lib/Producers/MixpanelBaseProducer.php b/lib/Producers/MixpanelBaseProducer.php index 14b5b67..b96acd4 100644 --- a/lib/Producers/MixpanelBaseProducer.php +++ b/lib/Producers/MixpanelBaseProducer.php @@ -107,12 +107,14 @@ public function __destruct() { public function flush($desired_batch_size = 50) { $queue_size = count($this->_queue); $succeeded = true; + $num_threads = $this->_consumer->getNumThreads(); + if ($this->_debug()) { $this->_log("Flush called - queue size: ".$queue_size); } while($queue_size > 0 && $succeeded) { - $batch_size = min(array($queue_size, $desired_batch_size, $this->_options['max_batch_size'])); + $batch_size = min(array($queue_size, $desired_batch_size*$num_threads, $this->_options['max_batch_size']*$num_threads)); $batch = array_splice($this->_queue, 0, $batch_size); $succeeded = $this->_persist($batch); diff --git a/test/ConsumerStrategies/CurlConsumerTest.php b/test/ConsumerStrategies/CurlConsumerTest.php index 0505719..aa2b886 100644 --- a/test/ConsumerStrategies/CurlConsumerTest.php +++ b/test/ConsumerStrategies/CurlConsumerTest.php @@ -67,6 +67,7 @@ function callback() { } "connect_timeout" => 1, "use_ssl" => true, "fork" => false, + "num_threads" => 5, "error_callback" => 'callback' )); @@ -75,6 +76,7 @@ function callback() { } $this->assertEquals($consumer->getTimeout(), 2); $this->assertEquals($consumer->getConnectTimeout(), 1); $this->assertEquals($consumer->getProtocol(), "https"); + $this->assertEquals($consumer->getNumThreads(), 5); } }