-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++] fix cpp client do AcknowledgeCumulative not clean up previous message #8606
Conversation
Could you explain why the last message id should not be removed? See pulsar/pulsar-client-cpp/lib/ConsumerImpl.cc Lines 851 to 854 in 281163b
After messages whose id is <= And see Java client's implementation: pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java Lines 784 to 786 in 281163b
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java Lines 225 to 239 in 281163b
The behaviors of Java and C++ client are the same. |
@BewareMyPower Sorry,I did not describe clearly. see https://github.com/apache/pulsar/pull/8606/files, the origin implementation of cpp is different from that of Java client. cpp client just earse |
@saosir Can you add a unit test that reproduces the issue? That will also help people to understand the problem. |
@saosir OK, I see. |
a8336f7
to
e98394a
Compare
@BewareMyPower Can you review this pull request again? |
OK |
@BewareMyPower I have two questions here, can you help me answer them?
|
From my perspective, both you concerned are right. But the Java client has the same behavior, like |
@BewareMyPower |
Yeah, |
@BewareMyPower |
@saosir It's pleasure to have your contribution. By the way, I just looked at the When you created a consumer with unacked messages tracker enabled like: Consumer consumer;
ConsumerConfiguration consumerConf;
consumerConf.setUnAckedMessagesTimeoutMs(11000); // must >= 10000
consumerConf.setTickDurationInMs(11000);
Result result = client.subscribe("my-topic", "consumer-1", consumerConf, consumer); If pulsar/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc Lines 231 to 232 in 102fa9d
And Therefore, the trackers of sub-consumers only do the redelivery but not add any message id, see pulsar/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc Lines 497 to 499 in 102fa9d
|
@BewareMyPower pulsar/pulsar-client-cpp/lib/ClientConnection.cc Lines 666 to 669 in 102fa9d
and ConsumerImpl#internalListener will be called and notify PartitionedConsumerImpl receive messge by listener of ConsumerConfiguration (here is PartitionedConsumerImpl::messageReceived )
|
f846daa
to
ed3704c
Compare
I reset branch at first commit |
Thanks. Please update the associated PR description too. Also I think the tests could still be added without the refactor, like what |
8ed9502
to
d3f76e5
Compare
All is done |
/pulsarbot run-failure-checks |
long size() { return UnAckedMessageTrackerEnabled::size(); } | ||
}; // class UnAckedMessageTrackerEnabledMock | ||
|
||
TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A typo: testtUnAcked
-> testUnAcked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I need to resubmit the code to fix this problem?How about someone fix it when merge pull request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a committer could fix it when merge PR. So you needn't any changes. @jiazhai could you help take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saosir Can you create a follow-up PR to fix the typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just leave a typo comment.
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@BewareMyPower |
@saosir Yeah, your branch is far behind the master, it's better to rebase to latest master. |
/pulsarbot run-failure-checks |
LGTM. @jiazhai Could you help take a look? |
long size() { return UnAckedMessageTrackerEnabled::size(); } | ||
}; // class UnAckedMessageTrackerEnabledMock | ||
|
||
TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saosir Can you create a follow-up PR to fix the typo?
…essage (#8606) ### Motivation pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in `UnAckedMessageTrackerEnabled::removeMessagesTill` ### Modifications - When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker - add unit test for `UnAckedMessageTrackerEnabled` (cherry picked from commit e75de48)
pulsar-client-cpp Consumer UnAckedMessageTrackerEnabled::removeMessagesTill should erase message whose id <=
msgId
in messageIdPartitionMapMotivation
pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up
msgId
, not <=msgId
inUnAckedMessageTrackerEnabled::removeMessagesTill
Modifications
msgId
in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to BrokerUnAckedMessageTrackerEnabled
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation