Skip to content

Commit

Permalink
Merge pull request #1621 from sjinks/beanstalk
Browse files Browse the repository at this point in the history
[1.3.0] Beanstalk
  • Loading branch information
Phalcon committed Dec 2, 2013
2 parents 720344b + 701e365 commit d931cf8
Showing 1 changed file with 154 additions and 107 deletions.
261 changes: 154 additions & 107 deletions ext/queue/beanstalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include "Zend/zend_exceptions.h"
#include "Zend/zend_interfaces.h"

#include "ext/standard/file.h"
#include "main/php_streams.h"

#include "kernel/main.h"
#include "kernel/memory.h"

Expand Down Expand Up @@ -102,49 +105,61 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, __construct){

PHP_METHOD(Phalcon_Queue_Beanstalk, connect){

zval *connection = NULL, *parameters, *host, *port, *error_num;
zval *error_str, *no_timeout, *microseconds;

PHALCON_MM_GROW();
zval *connection = NULL, *parameters, *host, *port;

PHALCON_OBS_VAR(connection);
phalcon_read_property_this(&connection, this_ptr, SL("_connection"), PH_NOISY_CC);
connection = phalcon_fetch_nproperty_this(this_ptr, SL("_connection"), PH_NOISY_CC);
if (Z_TYPE_P(connection) == IS_RESOURCE) {
PHALCON_MM_GROW();
phalcon_call_method_noret(this_ptr, "disconnect");
PHALCON_MM_RESTORE();
}

PHALCON_OBS_VAR(parameters);
phalcon_read_property_this(&parameters, this_ptr, SL("_parameters"), PH_NOISY_CC);

PHALCON_OBS_VAR(host);
phalcon_array_fetch_string(&host, parameters, SL("host"), PH_NOISY);
parameters = phalcon_fetch_nproperty_this(this_ptr, SL("_parameters"), PH_NOISY_CC);

PHALCON_OBS_VAR(port);
phalcon_array_fetch_string(&port, parameters, SL("port"), PH_NOISY);

PHALCON_INIT_VAR(error_num);

PHALCON_INIT_VAR(error_str);
Z_SET_ISREF_P(error_num);
Z_SET_ISREF_P(error_str);

PHALCON_INIT_NVAR(connection);
phalcon_call_func_p4(connection, "fsockopen", host, port, error_num, error_str);
Z_UNSET_ISREF_P(error_num);
Z_UNSET_ISREF_P(error_str);
if (Z_TYPE_P(connection) != IS_RESOURCE) {
PHALCON_THROW_EXCEPTION_STR(phalcon_exception_ce, "Can't connect to Beanstalk server");
if (!phalcon_array_isset_string_fetch(&host, parameters, SS("host")) || !phalcon_array_isset_string_fetch(&port, parameters, SS("port"))) {
PHALCON_THROW_EXCEPTION_STRW(phalcon_exception_ce, "Unexpected inconsistency in options");
return;
}

PHALCON_INIT_VAR(no_timeout);
ZVAL_LONG(no_timeout, -1);

PHALCON_INIT_VAR(microseconds);
phalcon_call_func_p3_noret("stream_set_timeout", connection, no_timeout, microseconds);
phalcon_update_property_this(this_ptr, SL("_connection"), connection TSRMLS_CC);

RETURN_CCTOR(connection);
convert_to_string(host);
convert_to_long(port);

{
ulong timeout = (ulong)(FG(default_socket_timeout) * 1000000.0);
char *hostname;
long int hostname_len = spprintf(&hostname, 0, "%s:%ld", Z_STRVAL_P(host), Z_LVAL_P(port));
struct timeval tv;
php_stream *stream;
int err;
char *errstr = NULL;

tv.tv_sec = timeout / 1000000;
tv.tv_usec = timeout % 1000000;

stream = php_stream_xport_create(hostname, hostname_len, ENFORCE_SAFE_MODE | REPORT_ERRORS, STREAM_XPORT_CLIENT | STREAM_XPORT_CONNECT, NULL, &tv, NULL, &errstr, &err);
efree(hostname);

if (!stream) {
zend_throw_exception_ex(phalcon_exception_ce, err TSRMLS_CC, "Unable to connect to Beanstalk server at %s:%ld (%s)", Z_STRVAL_P(host), Z_LVAL_P(port), (errstr == NULL ? "Unknown error" : errstr));
}

if (errstr) {
efree(errstr);
}

if (!stream) {
RETURN_NULL();
}

tv.tv_sec = -1;
tv.tv_usec = 0;
php_stream_set_option(stream, PHP_STREAM_OPTION_READ_TIMEOUT, 0, &tv);

MAKE_STD_ZVAL(connection);
php_stream_to_zval(stream, connection);
phalcon_update_property_this(this_ptr, SL("_connection"), connection TSRMLS_CC);
RETVAL_ZVAL(connection, 1, 1);
}
}

/**
Expand All @@ -170,26 +185,18 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, put){
/**
* Priority is 100 by default
*/
if (phalcon_array_isset_string(options, SS("priority"))) {
PHALCON_OBS_VAR(priority);
phalcon_array_fetch_string(&priority, options, SL("priority"), PH_NOISY);
} else {
PHALCON_INIT_NVAR(priority);
if (!phalcon_array_isset_string_fetch(&priority, options, SS("priority"))) {
PHALCON_INIT_VAR(priority);
ZVAL_STRING(priority, "100", 1);
}
if (phalcon_array_isset_string(options, SS("delay"))) {
PHALCON_OBS_VAR(delay);
phalcon_array_fetch_string(&delay, options, SL("delay"), PH_NOISY);
} else {
PHALCON_INIT_NVAR(delay);

if (!phalcon_array_isset_string_fetch(&delay, options, SS("delay"))) {
PHALCON_INIT_VAR(delay);
ZVAL_STRING(delay, "0", 1);
}

if (phalcon_array_isset_string(options, SS("ttr"))) {
PHALCON_OBS_VAR(ttr);
phalcon_array_fetch_string(&ttr, options, SL("ttr"), PH_NOISY);
} else {
PHALCON_INIT_NVAR(ttr);

if (!phalcon_array_isset_string_fetch(&ttr, options, SS("ttr"))) {
PHALCON_INIT_VAR(ttr);
ZVAL_STRING(ttr, "86400", 1);
}

Expand All @@ -199,9 +206,14 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, put){
PHALCON_INIT_VAR(serialized);
phalcon_serialize(serialized, &data TSRMLS_CC);

PHALCON_INIT_VAR(serialized_length);
phalcon_fast_strlen(serialized_length, serialized);

if (Z_TYPE_P(serialized) == IS_STRING) {
PHALCON_INIT_VAR(serialized_length);
ZVAL_LONG(serialized_length, Z_STRLEN_P(serialized));
}
else {
RETURN_MM_FALSE;
}

/**
* Create the command
*/
Expand Down Expand Up @@ -248,12 +260,11 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, reserve){
if (!timeout) {
timeout = PHALCON_GLOBAL(z_null);
}


PHALCON_INIT_VAR(command);
if (zend_is_true(timeout)) {
PHALCON_INIT_VAR(command);
PHALCON_CONCAT_SV(command, "reserve-with-timeout ", timeout);
} else {
PHALCON_INIT_NVAR(command);
ZVAL_STRING(command, "reserve", 1);
}
phalcon_call_method_p1_noret(this_ptr, "write", command);
Expand Down Expand Up @@ -429,73 +440,94 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, readStatus){
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, read){

zval *length = NULL, *connection = NULL, *is_eof, *eof_chars;
zval *total_length = NULL, *data, *meta, *timeout, *mask;
zval *packet = NULL, *end_of_file;
zval **length = NULL, *connection, *meta;
php_stream *stream;

PHALCON_MM_GROW();

phalcon_fetch_params(1, 0, 1, &length);
phalcon_fetch_params_ex(0, 1, &length);

if (!length) {
length = PHALCON_GLOBAL(z_null);
length = &PHALCON_GLOBAL(z_zero);
}
else {
PHALCON_ENSURE_IS_LONG(length);
}

PHALCON_OBS_VAR(connection);
phalcon_read_property_this(&connection, this_ptr, SL("_connection"), PH_NOISY_CC);
if (Z_TYPE_P(connection) != IS_RESOURCE) {

PHALCON_INIT_NVAR(connection);
phalcon_call_method(connection, this_ptr, "connect");
if (Z_TYPE_P(connection) != IS_RESOURCE) {
RETURN_MM_FALSE;
}
}

if (zend_is_true(length)) {
php_stream_from_zval_no_verify(stream, &connection);
if (!stream) {
RETURN_MM_FALSE;
}

if (zend_is_true(*length)) {
long int total_length;
long int len;
zend_bool timeout = 0;
char *buf;

PHALCON_INIT_VAR(is_eof);
phalcon_call_func_p1(is_eof, "feof", connection);
if (zend_is_true(is_eof)) {
if (php_stream_eof(stream)) {
RETURN_MM_FALSE;
}

PHALCON_INIT_VAR(eof_chars);
ZVAL_LONG(eof_chars, 2);

PHALCON_INIT_VAR(total_length);
phalcon_add_function(total_length, length, eof_chars TSRMLS_CC);

PHALCON_INIT_VAR(data);
phalcon_call_func_p2(data, "fread", connection, total_length);

total_length = Z_LVAL_PP(length) + 2;

buf = ecalloc(1, total_length + 1);
len = php_stream_read(stream, buf, total_length);

ZVAL_STRINGL(return_value, buf, len, 0);

PHALCON_INIT_VAR(meta);
phalcon_call_func_p1(meta, "stream_get_meta_data", connection);

PHALCON_OBS_VAR(timeout);
phalcon_array_fetch_string(&timeout, meta, SL("timed_out"), PH_NOISY);
if (zend_is_true(timeout)) {
array_init_size(meta, 4);
if (php_stream_populate_meta_data(stream, meta)) {
zval *t;
if (phalcon_array_isset_string_fetch(&t, meta, SS("timed_out"))) {
timeout = zend_is_true(t);
}
}

if (timeout) {
PHALCON_THROW_EXCEPTION_STR(phalcon_exception_ce, "Connection timed out");
return;
}

PHALCON_INIT_VAR(mask);
ZVAL_STRING(mask, "\r\n", 1);

PHALCON_INIT_VAR(packet);
phalcon_call_func_p2(packet, "rtrim", data, mask);
} else {
PHALCON_INIT_NVAR(total_length);
ZVAL_LONG(total_length, 16384);

PHALCON_INIT_VAR(end_of_file);
ZVAL_STRING(end_of_file, "\r\n", 1);

PHALCON_INIT_NVAR(packet);
phalcon_call_func_p3(packet, "stream_get_line", connection, total_length, end_of_file);
size_t line_len = 0;
long int len = 16384;
char *buf = ecalloc(1, len+1);

if (php_stream_get_line(stream, buf, len, &line_len) != NULL) {
if (line_len < 512) {
buf = erealloc(buf, line_len + 1);
}

ZVAL_STRINGL(return_value, buf, line_len, 0);
}
else {
efree(buf);
ZVAL_FALSE(return_value);
}
}

RETURN_CCTOR(packet);

if (Z_TYPE_P(return_value) == IS_STRING && Z_STRLEN_P(return_value) >= 2) {
char *s = Z_STRVAL_P(return_value);
long int len = Z_STRLEN_P(return_value);

if (s[len-1] == '\n' && s[len-2] == '\r') {
s[len-2] = '\0';
Z_STRLEN_P(return_value) -= 2;
}
}

PHALCON_MM_RESTORE();
}

/**
Expand All @@ -506,7 +538,8 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, read){
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, write){

zval *data, *connection = NULL, *packet, *data_length;
zval *data, *connection, *packet;
php_stream *stream;

PHALCON_MM_GROW();

Expand All @@ -523,12 +556,15 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, write){
}
}

php_stream_from_zval_no_verify(stream, &connection);
if (!stream) {
RETURN_MM_FALSE;
}

PHALCON_INIT_VAR(packet);
PHALCON_CONCAT_VS(packet, data, "\r\n");

PHALCON_INIT_VAR(data_length);
phalcon_fast_strlen(data_length, packet);
phalcon_call_func_p3(return_value, "fwrite", connection, packet, data_length);
php_stream_write(stream, Z_STRVAL_P(packet), Z_STRLEN_P(packet));
RETURN_MM();
}

Expand All @@ -540,16 +576,27 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, write){
PHP_METHOD(Phalcon_Queue_Beanstalk, disconnect){

zval *connection;
php_stream *stream;

PHALCON_MM_GROW();

PHALCON_OBS_VAR(connection);
phalcon_read_property_this(&connection, this_ptr, SL("_connection"), PH_NOISY_CC);
connection = phalcon_fetch_nproperty_this(this_ptr, SL("_connection"), PH_NOISY_CC);
if (Z_TYPE_P(connection) != IS_RESOURCE) {
RETURN_MM_FALSE;
RETURN_FALSE;
}

phalcon_call_func_p1_noret("fclose", connection);
RETURN_MM_TRUE;
php_stream_from_zval_no_verify(stream, &connection);
if (!stream) {
RETURN_FALSE;
}

if ((stream->flags & PHP_STREAM_FLAG_NO_FCLOSE) == 0) {
if (!stream->is_persistent) {
zend_list_delete(stream->rsrc_id);
}
else {
php_stream_pclose(stream);
}
}

RETURN_TRUE;
}

0 comments on commit d931cf8

Please sign in to comment.