Skip to content

Commit

Permalink
Merge pull request #1650 from sjinks/beanstalk
Browse files Browse the repository at this point in the history
Phalcon\Queue\Beanstalk enhancements
  • Loading branch information
Phalcon committed Dec 11, 2013
2 parents 056440f + 113aaef commit 9af252e
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 19 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@
- Optimized Phalcon\Paginator\Adapter\NativeArray (#1653)
- Phalcon\Queue:
- Fixed bug in Phalcon\Queue\Beanstalk::read() (#1348, #1612)
- Bug fixes in beanstalkd protocol implementation
- Bug fixes in beanstalkd protocol implementation (#1650)
- Optimizations (#1621)
- Added peekDelayed() and peekburied() to Phalcon\Queue\Beanstalk (#1650)
- Added kick(), bury(), release(), touch() to Phalcon\Queue\Beanstalk\Job (#1650)
- Phalcon\Security:
- Phalcon\Security\Exception inherits from Phalcon\Exception, not from \Phalcon\DI\Exception
- Added Phalcon\Security::computeHmac() (#1347)
Expand Down
104 changes: 87 additions & 17 deletions ext/queue/beanstalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "WATCH")) {
if (PHALCON_IS_STRING(status, "WATCHING")) {
PHALCON_OBS_VAR(watching_tube);
phalcon_array_fetch_long(&watching_tube, response, 1, PH_NOISY);
RETURN_CCTOR(watching_tube);
Expand All @@ -378,15 +378,42 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){
RETURN_MM_FALSE;
}

static void phalcon_queue_beanstalk_peek_common(zval *return_value, zval *this_ptr, zval *response TSRMLS_DC)
{
zval *job_id, *length, *serialized = NULL, *body;

if (!phalcon_array_isset_long_fetch(&job_id, response, 1)) {
job_id = PHALCON_GLOBAL(z_null);
}

if (!phalcon_array_isset_long_fetch(&length, response, 2)) {
length = PHALCON_GLOBAL(z_null);
}

phalcon_call_method_params(serialized, &serialized, this_ptr, SL("read"), zend_inline_hash_func(SS("read")) TSRMLS_CC, 1, length);
if (EG(exception)) {
return;
}

MAKE_STD_ZVAL(body);
phalcon_unserialize(body, serialized TSRMLS_CC);
zval_ptr_dtor(&serialized);
if (Z_REFCOUNT_P(body) >= 1) {
Z_DELREF_P(body);
}

object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
phalcon_call_method_params(NULL, NULL, return_value, SL("__construct"), zend_inline_hash_func(SS("__construct")) TSRMLS_CC, 3, this_ptr, job_id, body);
}

/**
* Inspect the next ready job.
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){

zval *command, *response, *status, *job_id, *length;
zval *serialized_body, *body;
zval *command, *response, *status;

PHALCON_MM_GROW();

Expand All @@ -400,26 +427,69 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){
PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
PHALCON_OBS_VAR(job_id);
phalcon_array_fetch_long(&job_id, response, 1, PH_NOISY);

PHALCON_OBS_VAR(length);
phalcon_array_fetch_long(&length, response, 2, PH_NOISY);

PHALCON_INIT_VAR(serialized_body);
phalcon_call_method_p1(serialized_body, this_ptr, "read", length);

PHALCON_INIT_VAR(body);
phalcon_unserialize(body, serialized_body TSRMLS_CC);
object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
phalcon_call_method_p3_noret(return_value, "__construct", this_ptr, job_id, body);

phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Return the delayed job with the shortest delay left
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed){

zval *command, *response, *status;

PHALCON_MM_GROW();

PHALCON_INIT_VAR(command);
ZVAL_STRING(command, "peek-delayed", 1);
phalcon_call_method_p1_noret(this_ptr, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, this_ptr, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Return the next job in the list of buried jobs
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried){

zval *command, *response, *status;

PHALCON_MM_GROW();

PHALCON_INIT_VAR(command);
ZVAL_STRING(command, "peek-buried", 1);
phalcon_call_method_p1_noret(this_ptr, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, this_ptr, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Reads the latest status from the Beanstalkd server
*
Expand Down
4 changes: 4 additions & 0 deletions ext/queue/beanstalk.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, reserve);
PHP_METHOD(Phalcon_Queue_Beanstalk, choose);
PHP_METHOD(Phalcon_Queue_Beanstalk, watch);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried);
PHP_METHOD(Phalcon_Queue_Beanstalk, readStatus);
PHP_METHOD(Phalcon_Queue_Beanstalk, read);
PHP_METHOD(Phalcon_Queue_Beanstalk, write);
Expand Down Expand Up @@ -68,6 +70,8 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_method_entry){
PHP_ME(Phalcon_Queue_Beanstalk, choose, arginfo_phalcon_queue_beanstalk_choose, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, watch, arginfo_phalcon_queue_beanstalk_watch, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekReady, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekDelayed, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekBuried, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, readStatus, NULL, ZEND_ACC_PROTECTED)
PHP_ME(Phalcon_Queue_Beanstalk, read, arginfo_phalcon_queue_beanstalk_read, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, write, NULL, ZEND_ACC_PROTECTED)
Expand Down
147 changes: 146 additions & 1 deletion ext/queue/beanstalk/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody){
/**
* Removes a job from the server entirely
*
* @param string $id
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
Expand Down Expand Up @@ -132,6 +131,152 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
RETURN_MM_FALSE;
}

/**
* The release command puts a reserved job back into the ready queue (and marks
* its state as "ready") to be run by any client. It is normally used when the job
* fails because of a transitory error.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release){

zval *priority = NULL, *delay = NULL;
zval *id, *command, *queue, *response, *status;

phalcon_fetch_params(0, 0, 2, &priority, &delay);

PHALCON_MM_GROW();

if (!priority) {
PHALCON_INIT_VAR(priority);
ZVAL_LONG(priority, 100);
}

if (!delay) {
delay = PHALCON_GLOBAL(z_zero);
}

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SVSVSV(command, "release ", id, " ", priority, " ", delay);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "RELEASED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* The bury command puts a job into the "buried" state. Buried jobs are put into
* a FIFO linked list and will not be touched by the server again until a client
* kicks them with the "kick" command.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury){

zval *priority = NULL;
zval *id, *command, *queue, *response, *status;

phalcon_fetch_params(0, 0, 1, &priority);

PHALCON_MM_GROW();

if (!priority) {
PHALCON_INIT_VAR(priority);
ZVAL_LONG(priority, 100);
}

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SVSV(command, "bury ", id, " ", priority);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "BURIED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* The bury command puts a job into the "buried" state. Buried jobs are put into
* a FIFO linked list and will not be touched by the server again until a client
* kicks them with the "kick" command.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch){

zval *id, *command, *queue, *response, *status;

PHALCON_MM_GROW();

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SV(command, "touch ", id);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "TOUCHED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* Move the job to the ready queue if it is delayed or buried.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick){

zval *id, *command, *queue, *response, *status;

PHALCON_MM_GROW();

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SV(command, "kick-job ", id);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "KICKED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup) {

zval *id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
Expand Down
8 changes: 8 additions & 0 deletions ext/queue/beanstalk/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __construct);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getId);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup);

ZEND_BEGIN_ARG_INFO_EX(arginfo_phalcon_queue_beanstalk_job___construct, 0, 0, 3)
Expand All @@ -38,6 +42,10 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_job_method_entry){
PHP_ME(Phalcon_Queue_Beanstalk_Job, getId, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, getBody, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, delete, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, release, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, bury, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, touch, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, kick, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, __wakeup, NULL, ZEND_ACC_PUBLIC)
PHP_FE_END
};
Loading

0 comments on commit 9af252e

Please sign in to comment.