From 7ca71fe131eb230952ed5de5692647235fc85efc Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Thu, 14 Jan 2021 18:09:10 +0100 Subject: [PATCH] Add pausePartitions(), resumePartitions() on RdKfaka\Kafka, RdKafka\KafkaConsumer (#438) --- kafka_consumer.c | 82 ++++++++++++++++++++++++++++ package.xml | 1 + rdkafka.c | 82 ++++++++++++++++++++++++++++ tests/pause_resume.phpt | 108 +++++++++++++++++++++++++++++++++++++ tests/topic_partition.phpt | 8 ++- topic_partition.c | 32 +++++++++-- topic_partition.h | 3 +- 7 files changed, 310 insertions(+), 6 deletions(-) create mode 100644 tests/pause_resume.phpt diff --git a/kafka_consumer.c b/kafka_consumer.c index fe1f1b30..539ea9ca 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -798,6 +798,86 @@ PHP_METHOD(RdKafka__KafkaConsumer, queryWatermarkOffsets) } /* }}} */ +/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::pausePatitions(RdKafka\TopicPartition[] $topicPartitions) + Pause consumption for the provided list of partitions. */ +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_pause_partitions, 0, 0, 1) + ZEND_ARG_INFO(0, topic_partitions) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__KafkaConsumer, pausePartitions) +{ + HashTable *htopars; + rd_kafka_topic_partition_list_t *topars; + rd_kafka_resp_err_t err; + object_intern *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + topars = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topars) { + return; + } + + err = rd_kafka_pause_partitions(intern->rk, topars); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_topic_partition_list_destroy(topars); + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + kafka_topic_partition_list_to_array(return_value, topars); + rd_kafka_topic_partition_list_destroy(topars); +} +/* }}} */ + +/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::resumePatitions(RdKafka\TopicPartition[] $topicPartitions) + Resume consumption for the provided list of partitions. */ +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_resume_partitions, 0, 0, 1) + ZEND_ARG_INFO(0, topic_partitions) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__KafkaConsumer, resumePartitions) +{ + HashTable *htopars; + rd_kafka_topic_partition_list_t *topars; + rd_kafka_resp_err_t err; + object_intern *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + topars = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topars) { + return; + } + + err = rd_kafka_resume_partitions(intern->rk, topars); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_topic_partition_list_destroy(topars); + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + kafka_topic_partition_list_to_array(return_value, topars); + rd_kafka_topic_partition_list_destroy(topars); +} +/* }}} */ + static const zend_function_entry fe[] = { /* {{{ */ PHP_ME(RdKafka__KafkaConsumer, __construct, arginfo_kafka_kafka_consumer___construct, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__KafkaConsumer, assign, arginfo_kafka_kafka_consumer_assign, ZEND_ACC_PUBLIC) @@ -815,6 +895,8 @@ static const zend_function_entry fe[] = { /* {{{ */ PHP_ME(RdKafka__KafkaConsumer, getOffsetPositions, arginfo_kafka_kafka_consumer_get_offset_positions, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__KafkaConsumer, queryWatermarkOffsets, arginfo_kafka_kafka_consumer_query_watermark_offsets, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__KafkaConsumer, offsetsForTimes, arginfo_kafka_kafka_consumer_offsets_for_times, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__KafkaConsumer, pausePartitions, arginfo_kafka_kafka_consumer_pause_partitions, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__KafkaConsumer, resumePartitions, arginfo_kafka_kafka_consumer_resume_partitions, ZEND_ACC_PUBLIC) PHP_FE_END }; /* }}} */ diff --git a/package.xml b/package.xml index 33d223c0..1237c48c 100644 --- a/package.xml +++ b/package.xml @@ -84,6 +84,7 @@ + diff --git a/rdkafka.c b/rdkafka.c index dd9ce0e5..bd16e09a 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -733,6 +733,86 @@ PHP_METHOD(RdKafka__Kafka, setLogger) } /* }}} */ +/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::pausePatitions(RdKafka\TopicPartition[] $topicPartitions) + Pause producing or consumption for the provided list of partitions. */ +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_pause_partitions, 0, 0, 1) + ZEND_ARG_INFO(0, topic_partitions) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__Kafka, pausePartitions) +{ + HashTable *htopars; + rd_kafka_topic_partition_list_t *topars; + rd_kafka_resp_err_t err; + kafka_object *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + topars = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topars) { + return; + } + + err = rd_kafka_pause_partitions(intern->rk, topars); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_topic_partition_list_destroy(topars); + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + kafka_topic_partition_list_to_array(return_value, topars); + rd_kafka_topic_partition_list_destroy(topars); +} +/* }}} */ + +/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::resumePatitions(RdKafka\TopicPartition[] $topicPartitions) + Resume producing consumption for the provided list of partitions. */ +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_resume_partitions, 0, 0, 1) + ZEND_ARG_INFO(0, topic_partitions) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__Kafka, resumePartitions) +{ + HashTable *htopars; + rd_kafka_topic_partition_list_t *topars; + rd_kafka_resp_err_t err; + kafka_object *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + topars = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topars) { + return; + } + + err = rd_kafka_pause_partitions(intern->rk, topars); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_topic_partition_list_destroy(topars); + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + kafka_topic_partition_list_to_array(return_value, topars); + rd_kafka_topic_partition_list_destroy(topars); +} +/* }}} */ + static const zend_function_entry kafka_fe[] = { PHP_ME(RdKafka__Kafka, addBrokers, arginfo_kafka_add_brokers, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__Kafka, getMetadata, arginfo_kafka_get_metadata, ZEND_ACC_PUBLIC) @@ -749,6 +829,8 @@ static const zend_function_entry kafka_fe[] = { PHP_ME(RdKafka__Kafka, setLogger, arginfo_kafka_set_logger, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED) PHP_ME(RdKafka__Kafka, queryWatermarkOffsets, arginfo_kafka_query_watermark_offsets, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__Kafka, offsetsForTimes, arginfo_kafka_offsets_for_times, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__Kafka, pausePartitions, arginfo_kafka_kafka_pause_partitions, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__Kafka, resumePartitions, arginfo_kafka_kafka_resume_partitions, ZEND_ACC_PUBLIC) PHP_FE_END }; diff --git a/tests/pause_resume.phpt b/tests/pause_resume.phpt new file mode 100644 index 00000000..3cd49067 --- /dev/null +++ b/tests/pause_resume.phpt @@ -0,0 +1,108 @@ +--TEST-- +Pause and resume partitions +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$producer = new RdKafka\Producer($conf); + +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +var_dump($producer->pausePartitions([ + new RdKafka\TopicPartition($topicName, 0), +])); +var_dump($producer->resumePartitions([ + new RdKafka\TopicPartition($topicName, 0), +])); + +$conf = new RdKafka\Conf(); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid())); + +$consumer = new RdKafka\KafkaConsumer($conf); +$consumer->assign([ + new RdKafka\TopicPartition($topicName, 0), +]); + +var_dump($consumer->pausePartitions([ + new RdKafka\TopicPartition($topicName, 0), +])); +var_dump($consumer->resumePartitions([ + new RdKafka\TopicPartition($topicName, 0), +])); +var_dump($consumer->resumePartitions([ + new RdKafka\TopicPartition("", -1), +])); +--EXPECTF-- +array(1) { + [0]=> + object(RdKafka\TopicPartition)#5 (4) { + ["topic"]=> + string(26) "test_rdkafka_%s" + ["partition"]=> + int(0) + ["offset"]=> + int(0) + ["err"]=> + int(0) + } +} +array(1) { + [0]=> + object(RdKafka\TopicPartition)#4 (4) { + ["topic"]=> + string(26) "test_rdkafka_%s" + ["partition"]=> + int(0) + ["offset"]=> + int(0) + ["err"]=> + int(0) + } +} +array(1) { + [0]=> + object(RdKafka\TopicPartition)#6 (4) { + ["topic"]=> + string(26) "test_rdkafka_%s" + ["partition"]=> + int(0) + ["offset"]=> + int(0) + ["err"]=> + int(0) + } +} +array(1) { + [0]=> + object(RdKafka\TopicPartition)#5 (4) { + ["topic"]=> + string(26) "test_rdkafka_%s" + ["partition"]=> + int(0) + ["offset"]=> + int(0) + ["err"]=> + int(0) + } +} +array(1) { + [0]=> + object(RdKafka\TopicPartition)#6 (4) { + ["topic"]=> + string(0) "" + ["partition"]=> + int(-1) + ["offset"]=> + int(0) + ["err"]=> + int(-190) + } +} diff --git a/tests/topic_partition.phpt b/tests/topic_partition.phpt index 956c0709..9e8b5507 100644 --- a/tests/topic_partition.phpt +++ b/tests/topic_partition.phpt @@ -26,13 +26,15 @@ $topar var_dump($topar); --EXPECT-- -object(RdKafka\TopicPartition)#1 (3) { +object(RdKafka\TopicPartition)#1 (4) { ["topic"]=> string(4) "test" ["partition"]=> int(-1) ["offset"]=> int(42) + ["err"]=> + int(0) } array(3) { ["topic"]=> @@ -42,11 +44,13 @@ array(3) { ["offset"]=> int(42) } -object(RdKafka\TopicPartition)#1 (3) { +object(RdKafka\TopicPartition)#1 (4) { ["topic"]=> string(3) "foo" ["partition"]=> int(123) ["offset"]=> int(43) + ["err"]=> + int(0) } diff --git a/topic_partition.c b/topic_partition.c index 9d1b1f73..0f176721 100644 --- a/topic_partition.c +++ b/topic_partition.c @@ -106,12 +106,13 @@ static HashTable *get_debug_info(Z_RDKAFKA_OBJ *object, int *is_temp) /* {{{ */ add_assoc_long(&ary, "partition", intern->partition); add_assoc_long(&ary, "offset", intern->offset); + add_assoc_long(&ary, "err", (zend_long) intern->err); return Z_ARRVAL(ary); } /* }}} */ -void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int64_t offset) /* {{{ */ +void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int64_t offset, rd_kafka_resp_err_t err) /* {{{ */ { object_intern *intern; @@ -127,6 +128,7 @@ void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int intern->partition = partition; intern->offset = offset; + intern->err = err; } /* }}} */ void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_partition_list_t *list) /* {{{ */ @@ -141,7 +143,7 @@ void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_part topar = &list->elems[i]; ZVAL_NULL(&ztopar); object_init_ex(&ztopar, ce_kafka_topic_partition); - kafka_topic_partition_init(&ztopar, topar->topic, topar->partition, topar->offset); + kafka_topic_partition_init(&ztopar, topar->topic, topar->partition, topar->offset, topar->err); add_next_index_zval(return_value, &ztopar); } } /* }}} */ @@ -211,7 +213,7 @@ PHP_METHOD(RdKafka__TopicPartition, __construct) return; } - kafka_topic_partition_init(getThis(), topic, partition, offset); + kafka_topic_partition_init(getThis(), topic, partition, offset, RD_KAFKA_RESP_ERR_NO_ERROR); zend_restore_error_handling(&error_handling); } @@ -376,6 +378,29 @@ PHP_METHOD(RdKafka__TopicPartition, setOffset) } /* }}} */ +/* {{{ proto int RdKafka\TopicPartition::getErr() + Returns err */ + +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_topic_partition_get_err, 0, 0, 0) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__TopicPartition, getErr) +{ + object_intern *intern; + + if (zend_parse_parameters_none() == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG((zend_long) intern->err); +} +/* }}} */ + static const zend_function_entry fe[] = { /* {{{ */ PHP_ME(RdKafka__TopicPartition, __construct, arginfo_kafka_topic_partition___construct, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__TopicPartition, getTopic, arginfo_kafka_topic_partition_get_topic, ZEND_ACC_PUBLIC) @@ -384,6 +409,7 @@ static const zend_function_entry fe[] = { /* {{{ */ PHP_ME(RdKafka__TopicPartition, setPartition, arginfo_kafka_topic_partition_set_partition, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__TopicPartition, getOffset, arginfo_kafka_topic_partition_get_offset, ZEND_ACC_PUBLIC) PHP_ME(RdKafka__TopicPartition, setOffset, arginfo_kafka_topic_partition_set_offset, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__TopicPartition, getErr, arginfo_kafka_topic_partition_get_err, ZEND_ACC_PUBLIC) PHP_FE_END }; /* }}} */ diff --git a/topic_partition.h b/topic_partition.h index f2d405dc..7c03690b 100644 --- a/topic_partition.h +++ b/topic_partition.h @@ -20,13 +20,14 @@ typedef struct _kafka_topic_partition_intern { char *topic; int32_t partition; int64_t offset; + rd_kafka_resp_err_t err; zend_object std; } kafka_topic_partition_intern; void kafka_metadata_topic_partition_minit(INIT_FUNC_ARGS); kafka_topic_partition_intern * get_topic_partition_object(zval *z); -void kafka_topic_partition_init(zval *z, char *topic, int32_t partition, int64_t offset); +void kafka_topic_partition_init(zval *z, char *topic, int32_t partition, int64_t offset, rd_kafka_resp_err_t err); void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_partition_list_t *list); rd_kafka_topic_partition_list_t * array_arg_to_kafka_topic_partition_list(int argnum, HashTable *ary);