diff --git a/CRM/Queue/Queue/SqlParallel.php b/CRM/Queue/Queue/SqlParallel.php new file mode 100644 index 000000000000..9eb12ac883db --- /dev/null +++ b/CRM/Queue/Queue/SqlParallel.php @@ -0,0 +1,210 @@ + [$this->getName(), 'String'], + ]); + } + + /** + * Check if the queue exists. + * + * @return bool + */ + public function existsQueue() { + return ($this->numberOfItems() > 0); + } + + /** + * Add a new item to the queue. + * + * @param mixed $data + * Serializable PHP object or array. + * @param array $options + * Queue-dependent options; for example, if this is a + * priority-queue, then $options might specify the item's priority. + */ + public function createItem($data, $options = []) { + $dao = new CRM_Queue_DAO_QueueItem(); + $dao->queue_name = $this->getName(); + $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); + $dao->data = serialize($data); + $dao->weight = CRM_Utils_Array::value('weight', $options, 0); + $dao->save(); + } + + /** + * Determine number of items remaining in the queue. + * + * @return int + */ + public function numberOfItems() { + return CRM_Core_DAO::singleValueQuery(" + SELECT count(*) + FROM civicrm_queue_item + WHERE queue_name = %1 + ", [ + 1 => [$this->getName(), 'String'], + ]); + } + + /** + * Get the next item. + * + * @param int $lease_time + * Seconds. + * + * @return object + * With key 'data' that matches the inputted data. + */ + public function claimItem($lease_time = 3600) { + + $result = NULL; + $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;'); + $sql = "SELECT id, queue_name, submit_time, release_time, data + FROM civicrm_queue_item + WHERE queue_name = %1 + AND release_time IS NULL + ORDER BY weight ASC, id ASC + LIMIT 1 + "; + $params = [ + 1 => [$this->getName(), 'String'], + ]; + $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); + if (is_a($dao, 'DB_Error')) { + // FIXME - Adding code to allow tests to pass + CRM_Core_Error::fatal(); + } + + if ($dao->fetch()) { + $nowEpoch = CRM_Utils_Time::getTimeRaw(); + CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ + '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], + '2' => [$dao->id, 'Integer'], + ]); + // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?) + // work-around: inconsistent date-formatting causes unintentional breakage + # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); + # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time); + # $dao->save(); + $dao->data = unserialize($dao->data); + $result = $dao; + } + + $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;'); + + return $result; + } + + /** + * Get the next item, even if there's an active lease + * + * @param int $lease_time + * Seconds. + * + * @return object + * With key 'data' that matches the inputted data. + */ + public function stealItem($lease_time = 3600) { + $sql = " + SELECT id, queue_name, submit_time, release_time, data + FROM civicrm_queue_item + WHERE queue_name = %1 + ORDER BY weight ASC, id ASC + LIMIT 1 + "; + $params = [ + 1 => [$this->getName(), 'String'], + ]; + $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); + if ($dao->fetch()) { + $nowEpoch = CRM_Utils_Time::getTimeRaw(); + CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ + '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], + '2' => [$dao->id, 'Integer'], + ]); + $dao->data = unserialize($dao->data); + return $dao; + } + } + + /** + * Remove an item from the queue. + * + * @param CRM_Core_DAO $dao + * The item returned by claimItem. + */ + public function deleteItem($dao) { + $dao->delete(); + $dao->free(); + } + + /** + * Return an item that could not be processed. + * + * @param CRM_Core_DAO $dao + * The item returned by claimItem. + */ + public function releaseItem($dao) { + $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; + $params = [ + 1 => [$dao->id, 'Integer'], + ]; + CRM_Core_DAO::executeQuery($sql, $params); + $dao->free(); + } + +}