Skip to content

Commit

Permalink
Add pausePartitions(), resumePartitions() on RdKfaka\Kafka, RdKafka\K…
Browse files Browse the repository at this point in the history
…afkaConsumer (#438)
  • Loading branch information
arnaud-lb authored Jan 14, 2021
1 parent ec302d2 commit 7ca71fe
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 6 deletions.
82 changes: 82 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,86 @@ PHP_METHOD(RdKafka__KafkaConsumer, queryWatermarkOffsets)
}
/* }}} */

/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::pausePatitions(RdKafka\TopicPartition[] $topicPartitions)
Pause consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_pause_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__KafkaConsumer, pausePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
rd_kafka_resp_err_t err;
object_intern *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

topars = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topars) {
return;
}

err = rd_kafka_pause_partitions(intern->rk, topars);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(topars);
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
return;
}

kafka_topic_partition_list_to_array(return_value, topars);
rd_kafka_topic_partition_list_destroy(topars);
}
/* }}} */

/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::resumePatitions(RdKafka\TopicPartition[] $topicPartitions)
Resume consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_resume_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__KafkaConsumer, resumePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
rd_kafka_resp_err_t err;
object_intern *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

topars = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topars) {
return;
}

err = rd_kafka_resume_partitions(intern->rk, topars);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(topars);
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
return;
}

kafka_topic_partition_list_to_array(return_value, topars);
rd_kafka_topic_partition_list_destroy(topars);
}
/* }}} */

static const zend_function_entry fe[] = { /* {{{ */
PHP_ME(RdKafka__KafkaConsumer, __construct, arginfo_kafka_kafka_consumer___construct, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__KafkaConsumer, assign, arginfo_kafka_kafka_consumer_assign, ZEND_ACC_PUBLIC)
Expand All @@ -815,6 +895,8 @@ static const zend_function_entry fe[] = { /* {{{ */
PHP_ME(RdKafka__KafkaConsumer, getOffsetPositions, arginfo_kafka_kafka_consumer_get_offset_positions, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__KafkaConsumer, queryWatermarkOffsets, arginfo_kafka_kafka_consumer_query_watermark_offsets, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__KafkaConsumer, offsetsForTimes, arginfo_kafka_kafka_consumer_offsets_for_times, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__KafkaConsumer, pausePartitions, arginfo_kafka_kafka_consumer_pause_partitions, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__KafkaConsumer, resumePartitions, arginfo_kafka_kafka_consumer_resume_partitions, ZEND_ACC_PUBLIC)
PHP_FE_END
}; /* }}} */

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<file role="test" name="integration-tests-check.php"/>
<file role="test" name="kafka_error_exception.phpt"/>
<file role="test" name="message_headers.phpt"/>
<file role="test" name="pause_resume.phpt"/>
<file role="test" name="produce_consume.phpt"/>
<file role="test" name="produce_consume_queue.phpt"/>
<file role="test" name="produce_consume_transactional.phpt"/>
Expand Down
82 changes: 82 additions & 0 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,86 @@ PHP_METHOD(RdKafka__Kafka, setLogger)
}
/* }}} */

/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::pausePatitions(RdKafka\TopicPartition[] $topicPartitions)
Pause producing or consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_pause_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, pausePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
rd_kafka_resp_err_t err;
kafka_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

topars = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topars) {
return;
}

err = rd_kafka_pause_partitions(intern->rk, topars);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(topars);
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
return;
}

kafka_topic_partition_list_to_array(return_value, topars);
rd_kafka_topic_partition_list_destroy(topars);
}
/* }}} */

/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::resumePatitions(RdKafka\TopicPartition[] $topicPartitions)
Resume producing consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_resume_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, resumePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
rd_kafka_resp_err_t err;
kafka_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

topars = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topars) {
return;
}

err = rd_kafka_pause_partitions(intern->rk, topars);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(topars);
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
return;
}

kafka_topic_partition_list_to_array(return_value, topars);
rd_kafka_topic_partition_list_destroy(topars);
}
/* }}} */

static const zend_function_entry kafka_fe[] = {
PHP_ME(RdKafka__Kafka, addBrokers, arginfo_kafka_add_brokers, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, getMetadata, arginfo_kafka_get_metadata, ZEND_ACC_PUBLIC)
Expand All @@ -749,6 +829,8 @@ static const zend_function_entry kafka_fe[] = {
PHP_ME(RdKafka__Kafka, setLogger, arginfo_kafka_set_logger, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
PHP_ME(RdKafka__Kafka, queryWatermarkOffsets, arginfo_kafka_query_watermark_offsets, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, offsetsForTimes, arginfo_kafka_offsets_for_times, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, pausePartitions, arginfo_kafka_kafka_pause_partitions, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, resumePartitions, arginfo_kafka_kafka_resume_partitions, ZEND_ACC_PUBLIC)
PHP_FE_END
};

Expand Down
108 changes: 108 additions & 0 deletions tests/pause_resume.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
--TEST--
Pause and resume partitions
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));

$producer = new RdKafka\Producer($conf);

$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);

var_dump($producer->pausePartitions([
new RdKafka\TopicPartition($topicName, 0),
]));
var_dump($producer->resumePartitions([
new RdKafka\TopicPartition($topicName, 0),
]));

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->assign([
new RdKafka\TopicPartition($topicName, 0),
]);

var_dump($consumer->pausePartitions([
new RdKafka\TopicPartition($topicName, 0),
]));
var_dump($consumer->resumePartitions([
new RdKafka\TopicPartition($topicName, 0),
]));
var_dump($consumer->resumePartitions([
new RdKafka\TopicPartition("", -1),
]));
--EXPECTF--
array(1) {
[0]=>
object(RdKafka\TopicPartition)#5 (4) {
["topic"]=>
string(26) "test_rdkafka_%s"
["partition"]=>
int(0)
["offset"]=>
int(0)
["err"]=>
int(0)
}
}
array(1) {
[0]=>
object(RdKafka\TopicPartition)#4 (4) {
["topic"]=>
string(26) "test_rdkafka_%s"
["partition"]=>
int(0)
["offset"]=>
int(0)
["err"]=>
int(0)
}
}
array(1) {
[0]=>
object(RdKafka\TopicPartition)#6 (4) {
["topic"]=>
string(26) "test_rdkafka_%s"
["partition"]=>
int(0)
["offset"]=>
int(0)
["err"]=>
int(0)
}
}
array(1) {
[0]=>
object(RdKafka\TopicPartition)#5 (4) {
["topic"]=>
string(26) "test_rdkafka_%s"
["partition"]=>
int(0)
["offset"]=>
int(0)
["err"]=>
int(0)
}
}
array(1) {
[0]=>
object(RdKafka\TopicPartition)#6 (4) {
["topic"]=>
string(0) ""
["partition"]=>
int(-1)
["offset"]=>
int(0)
["err"]=>
int(-190)
}
}
8 changes: 6 additions & 2 deletions tests/topic_partition.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ $topar

var_dump($topar);
--EXPECT--
object(RdKafka\TopicPartition)#1 (3) {
object(RdKafka\TopicPartition)#1 (4) {
["topic"]=>
string(4) "test"
["partition"]=>
int(-1)
["offset"]=>
int(42)
["err"]=>
int(0)
}
array(3) {
["topic"]=>
Expand All @@ -42,11 +44,13 @@ array(3) {
["offset"]=>
int(42)
}
object(RdKafka\TopicPartition)#1 (3) {
object(RdKafka\TopicPartition)#1 (4) {
["topic"]=>
string(3) "foo"
["partition"]=>
int(123)
["offset"]=>
int(43)
["err"]=>
int(0)
}
Loading

0 comments on commit 7ca71fe

Please sign in to comment.