diff --git a/CHANGELOG b/CHANGELOG index db10f2326f..249dba74f2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -190,8 +190,10 @@ - Optimized Phalcon\Paginator\Adapter\NativeArray (#1653) - Phalcon\Queue: - Fixed bug in Phalcon\Queue\Beanstalk::read() (#1348, #1612) - - Bug fixes in beanstalkd protocol implementation + - Bug fixes in beanstalkd protocol implementation (#1650) - Optimizations (#1621) + - Added peekDelayed() and peekburied() to Phalcon\Queue\Beanstalk (#1650) + - Added kick(), bury(), release(), touch() to Phalcon\Queue\Beanstalk\Job (#1650) - Phalcon\Security: - Phalcon\Security\Exception inherits from Phalcon\Exception, not from \Phalcon\DI\Exception - Added Phalcon\Security::computeHmac() (#1347) diff --git a/ext/queue/beanstalk.c b/ext/queue/beanstalk.c index cc8b1e3024..6220549bef 100644 --- a/ext/queue/beanstalk.c +++ b/ext/queue/beanstalk.c @@ -369,7 +369,7 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){ PHALCON_OBS_VAR(status); phalcon_array_fetch_long(&status, response, 0, PH_NOISY); - if (PHALCON_IS_STRING(status, "WATCH")) { + if (PHALCON_IS_STRING(status, "WATCHING")) { PHALCON_OBS_VAR(watching_tube); phalcon_array_fetch_long(&watching_tube, response, 1, PH_NOISY); RETURN_CCTOR(watching_tube); @@ -378,6 +378,34 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){ RETURN_MM_FALSE; } +static void phalcon_queue_beanstalk_peek_common(zval *return_value, zval *this_ptr, zval *response TSRMLS_DC) +{ + zval *job_id, *length, *serialized = NULL, *body; + + if (!phalcon_array_isset_long_fetch(&job_id, response, 1)) { + job_id = PHALCON_GLOBAL(z_null); + } + + if (!phalcon_array_isset_long_fetch(&length, response, 2)) { + length = PHALCON_GLOBAL(z_null); + } + + phalcon_call_method_params(serialized, &serialized, this_ptr, SL("read"), zend_inline_hash_func(SS("read")) TSRMLS_CC, 1, length); + if (EG(exception)) { + return; + } + + MAKE_STD_ZVAL(body); + phalcon_unserialize(body, serialized TSRMLS_CC); + zval_ptr_dtor(&serialized); + if (Z_REFCOUNT_P(body) >= 1) { + Z_DELREF_P(body); + } + + object_init_ex(return_value, phalcon_queue_beanstalk_job_ce); + phalcon_call_method_params(NULL, NULL, return_value, SL("__construct"), zend_inline_hash_func(SS("__construct")) TSRMLS_CC, 3, this_ptr, job_id, body); +} + /** * Inspect the next ready job. * @@ -385,8 +413,7 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){ */ PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){ - zval *command, *response, *status, *job_id, *length; - zval *serialized_body, *body; + zval *command, *response, *status; PHALCON_MM_GROW(); @@ -400,26 +427,69 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){ PHALCON_OBS_VAR(status); phalcon_array_fetch_long(&status, response, 0, PH_NOISY); if (PHALCON_IS_STRING(status, "FOUND")) { - PHALCON_OBS_VAR(job_id); - phalcon_array_fetch_long(&job_id, response, 1, PH_NOISY); - - PHALCON_OBS_VAR(length); - phalcon_array_fetch_long(&length, response, 2, PH_NOISY); - - PHALCON_INIT_VAR(serialized_body); - phalcon_call_method_p1(serialized_body, this_ptr, "read", length); - - PHALCON_INIT_VAR(body); - phalcon_unserialize(body, serialized_body TSRMLS_CC); - object_init_ex(return_value, phalcon_queue_beanstalk_job_ce); - phalcon_call_method_p3_noret(return_value, "__construct", this_ptr, job_id, body); - + phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC); RETURN_MM(); } RETURN_MM_FALSE; } +/** + * Return the delayed job with the shortest delay left + * + * @return boolean|Phalcon\Queue\Beanstalk\Job + */ +PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed){ + + zval *command, *response, *status; + + PHALCON_MM_GROW(); + + PHALCON_INIT_VAR(command); + ZVAL_STRING(command, "peek-delayed", 1); + phalcon_call_method_p1_noret(this_ptr, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, this_ptr, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "FOUND")) { + phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC); + RETURN_MM(); + } + + RETURN_MM_FALSE; +} + +/** + * Return the next job in the list of buried jobs + * + * @return boolean|Phalcon\Queue\Beanstalk\Job + */ +PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried){ + + zval *command, *response, *status; + + PHALCON_MM_GROW(); + + PHALCON_INIT_VAR(command); + ZVAL_STRING(command, "peek-buried", 1); + phalcon_call_method_p1_noret(this_ptr, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, this_ptr, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "FOUND")) { + phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC); + RETURN_MM(); + } + + RETURN_MM_FALSE; +} + /** * Reads the latest status from the Beanstalkd server * diff --git a/ext/queue/beanstalk.h b/ext/queue/beanstalk.h index 928c45ec9a..6a1bfea089 100644 --- a/ext/queue/beanstalk.h +++ b/ext/queue/beanstalk.h @@ -28,6 +28,8 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, reserve); PHP_METHOD(Phalcon_Queue_Beanstalk, choose); PHP_METHOD(Phalcon_Queue_Beanstalk, watch); PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady); +PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed); +PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried); PHP_METHOD(Phalcon_Queue_Beanstalk, readStatus); PHP_METHOD(Phalcon_Queue_Beanstalk, read); PHP_METHOD(Phalcon_Queue_Beanstalk, write); @@ -68,6 +70,8 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_method_entry){ PHP_ME(Phalcon_Queue_Beanstalk, choose, arginfo_phalcon_queue_beanstalk_choose, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk, watch, arginfo_phalcon_queue_beanstalk_watch, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk, peekReady, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk, peekDelayed, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk, peekBuried, NULL, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk, readStatus, NULL, ZEND_ACC_PROTECTED) PHP_ME(Phalcon_Queue_Beanstalk, read, arginfo_phalcon_queue_beanstalk_read, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk, write, NULL, ZEND_ACC_PROTECTED) diff --git a/ext/queue/beanstalk/job.c b/ext/queue/beanstalk/job.c index 835e18695e..74fbaa7baa 100644 --- a/ext/queue/beanstalk/job.c +++ b/ext/queue/beanstalk/job.c @@ -104,7 +104,6 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody){ /** * Removes a job from the server entirely * - * @param string $id * @return boolean */ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){ @@ -132,6 +131,152 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){ RETURN_MM_FALSE; } +/** + * The release command puts a reserved job back into the ready queue (and marks + * its state as "ready") to be run by any client. It is normally used when the job + * fails because of a transitory error. + * + * @return boolean + */ +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release){ + + zval *priority = NULL, *delay = NULL; + zval *id, *command, *queue, *response, *status; + + phalcon_fetch_params(0, 0, 2, &priority, &delay); + + PHALCON_MM_GROW(); + + if (!priority) { + PHALCON_INIT_VAR(priority); + ZVAL_LONG(priority, 100); + } + + if (!delay) { + delay = PHALCON_GLOBAL(z_zero); + } + + id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC); + queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC); + + PHALCON_ALLOC_GHOST_ZVAL(command); + PHALCON_CONCAT_SVSVSV(command, "release ", id, " ", priority, " ", delay); + phalcon_call_method_p1_noret(queue, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, queue, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "RELEASED")) { + RETURN_MM_TRUE; + } + + RETURN_MM_FALSE; +} + +/** + * The bury command puts a job into the "buried" state. Buried jobs are put into + * a FIFO linked list and will not be touched by the server again until a client + * kicks them with the "kick" command. + * + * @return boolean + */ +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury){ + + zval *priority = NULL; + zval *id, *command, *queue, *response, *status; + + phalcon_fetch_params(0, 0, 1, &priority); + + PHALCON_MM_GROW(); + + if (!priority) { + PHALCON_INIT_VAR(priority); + ZVAL_LONG(priority, 100); + } + + id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC); + queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC); + + PHALCON_ALLOC_GHOST_ZVAL(command); + PHALCON_CONCAT_SVSV(command, "bury ", id, " ", priority); + phalcon_call_method_p1_noret(queue, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, queue, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "BURIED")) { + RETURN_MM_TRUE; + } + + RETURN_MM_FALSE; +} + +/** + * The bury command puts a job into the "buried" state. Buried jobs are put into + * a FIFO linked list and will not be touched by the server again until a client + * kicks them with the "kick" command. + * + * @return boolean + */ +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch){ + + zval *id, *command, *queue, *response, *status; + + PHALCON_MM_GROW(); + + id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC); + queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC); + + PHALCON_ALLOC_GHOST_ZVAL(command); + PHALCON_CONCAT_SV(command, "touch ", id); + phalcon_call_method_p1_noret(queue, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, queue, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "TOUCHED")) { + RETURN_MM_TRUE; + } + + RETURN_MM_FALSE; +} + +/** + * Move the job to the ready queue if it is delayed or buried. + * + * @return boolean + */ +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick){ + + zval *id, *command, *queue, *response, *status; + + PHALCON_MM_GROW(); + + id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC); + queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC); + + PHALCON_ALLOC_GHOST_ZVAL(command); + PHALCON_CONCAT_SV(command, "kick-job ", id); + phalcon_call_method_p1_noret(queue, "write", command); + + PHALCON_INIT_VAR(response); + phalcon_call_method(response, queue, "readstatus"); + + PHALCON_OBS_VAR(status); + phalcon_array_fetch_long(&status, response, 0, PH_NOISY); + if (PHALCON_IS_STRING(status, "KICKED")) { + RETURN_MM_TRUE; + } + + RETURN_MM_FALSE; +} + PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup) { zval *id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC); diff --git a/ext/queue/beanstalk/job.h b/ext/queue/beanstalk/job.h index a72a3ca1b7..d45fbfabc2 100644 --- a/ext/queue/beanstalk/job.h +++ b/ext/queue/beanstalk/job.h @@ -25,6 +25,10 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __construct); PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getId); PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody); PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete); +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release); +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury); +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch); +PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick); PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup); ZEND_BEGIN_ARG_INFO_EX(arginfo_phalcon_queue_beanstalk_job___construct, 0, 0, 3) @@ -38,6 +42,10 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_job_method_entry){ PHP_ME(Phalcon_Queue_Beanstalk_Job, getId, NULL, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk_Job, getBody, NULL, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk_Job, delete, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk_Job, release, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk_Job, bury, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk_Job, touch, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Phalcon_Queue_Beanstalk_Job, kick, NULL, ZEND_ACC_PUBLIC) PHP_ME(Phalcon_Queue_Beanstalk_Job, __wakeup, NULL, ZEND_ACC_PUBLIC) PHP_FE_END }; diff --git a/unit-tests/BeanstalkTest.php b/unit-tests/BeanstalkTest.php index 435c8d3bd1..d99019a02f 100644 --- a/unit-tests/BeanstalkTest.php +++ b/unit-tests/BeanstalkTest.php @@ -41,4 +41,56 @@ public function testBasic() $this->assertEquals($expected, $actual); } } + + public function testReleaseKickBury() + { + $queue = new Phalcon\Queue\Beanstalk(); + try { + @$queue->connect(); + } + catch (Exception $e) { + $this->markTestSkipped($e->getMessage()); + return; + } + + $this->assertTrue($queue->choose('beanstalk-test') !== false); + + $task = 'doSomething'; + + $this->assertTrue($queue->put($task) !== false); + + $this->assertTrue($queue->watch('beanstalk-test') !== false); + + $job = $queue->reserve(0); + + $this->assertTrue($job !== false); + $this->assertEquals($task, $job->getBody()); + + $this->assertTrue($job->touch()); + + // Release the job; it moves to the ready queue + $this->assertTrue($job->release()); + $job = $queue->reserve(0); + + $this->assertTrue($job !== false); + $this->assertEquals($task, $job->getBody()); + + // Bury the job + $this->assertTrue($job->bury()); + $job = $queue->peekBuried(); + + $this->assertTrue($job !== false); + $this->assertEquals($task, $job->getBody()); + + // Kick the job, it should move to the ready queue again + // kick-job is supported since 1.8 + if (false !== $job->kick()) { + $job = $queue->peekReady(); + + $this->assertTrue($job !== false); + $this->assertEquals($task, $job->getBody()); + } + + $this->assertTrue($job->delete()); + } }