Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include a new SqlParallel queue type that enables multiple queue runners to process in parallel #15422

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions CRM/Queue/Queue/SqlParallel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
<?php
/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

/**
* A queue implementation which stores items in the CiviCRM SQL database
*/
class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {

/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
* known to be an existing queue).
*
* @param array $queueSpec
* Array with keys:
* - type: string, required, e.g. "interactive", "immediate", "stomp",
* "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
* - reset: bool, optional; if a queue is found, then it should be
* flushed; default to TRUE
* - (additional keys depending on the queue provider).
*/
public function __construct($queueSpec) {
parent::__construct($queueSpec);
}

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Add a new item to the queue.
*
* @param mixed $data
* Serializable PHP object or array.
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
*/
public function createItem($data, $options = []) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
$dao->data = serialize($data);
$dao->weight = CRM_Utils_Array::value('weight', $options, 0);
$dao->save();
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Get the next item.
*
* @param int $lease_time
* Seconds.
*
* @return object
* With key 'data' that matches the inputted data.
*/
public function claimItem($lease_time = 3600) {

$result = NULL;
$dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
$sql = "SELECT id, queue_name, submit_time, release_time, data
FROM civicrm_queue_item
WHERE queue_name = %1
AND release_time IS NULL
ORDER BY weight ASC, id ASC
LIMIT 1
";
$params = [
1 => [$this->getName(), 'String'],
];
$dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
if (is_a($dao, 'DB_Error')) {
// FIXME - Adding code to allow tests to pass
CRM_Core_Error::fatal();
}

if ($dao->fetch()) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
'1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
'2' => [$dao->id, 'Integer'],
]);
// (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
// work-around: inconsistent date-formatting causes unintentional breakage
# $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
# $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
# $dao->save();
$dao->data = unserialize($dao->data);
$result = $dao;
}

$dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');

return $result;
}

/**
* Get the next item, even if there's an active lease
*
* @param int $lease_time
* Seconds.
*
* @return object
* With key 'data' that matches the inputted data.
*/
public function stealItem($lease_time = 3600) {
$sql = "
SELECT id, queue_name, submit_time, release_time, data
FROM civicrm_queue_item
WHERE queue_name = %1
ORDER BY weight ASC, id ASC
LIMIT 1
";
$params = [
1 => [$this->getName(), 'String'],
];
$dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
if ($dao->fetch()) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
'1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
'2' => [$dao->id, 'Integer'],
]);
$dao->data = unserialize($dao->data);
return $dao;
}
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
$dao->free();
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
$dao->free();
}

}