-
Notifications
You must be signed in to change notification settings - Fork 451
bugfix, issue141,179: lost message when consumer restart #188
base: master
Are you sure you want to change the base?
Conversation
} | ||
|
||
$consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset; | ||
$consumerOffset = $commitOffset + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here shouldn't + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
becase the returned offset is the offsetManager stored offset commited blow this groupId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@noname007 consumer offset is equal to commit offset + 1, so I add 1 to the commit offset, and consumer will fetch the next message. If you don't add 1 to commit offset, you will fetch the same message committed last time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you said right but the pos is wrong ref #189
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i use your code but i still lost message, #186 is the condition i meet,but the test case i could not let run as i meet the position......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chongchaoyu https://github.com/weiboad/kafka-php/pull/186/files#diff-abf40442ee37f59d8941040a616cf543R121 这个就是浮现问题的脚本,但是放到测试里面,没有想好怎么设置断言。
本地脚本运行方式:
直接抛异常,然后再消费,观察日志就能发现,数据就丢失了一条,从日志里面也可以看到offset 错乱的问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@noname007 I understand that English is not your native language (it's also not mine), but I'd really love if I don't have to translate comments in order to help people...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i said nothing.....just say the script run mode in my machine that could cause the problem:
throw a exception after it run ,and run again ,and will find the offset wrong in the log file .
but the test case i write was not run as i thought because i could write the right assert statement.
because i found he is a Chinese man in the QQ group so i select our native lang to talk... sorry for that.
en , did you use QQ international ? i can pull you into the group hhhhha 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because i found he is a Chinese man in the QQ group so i select our native lang to talk... sorry for that.
No worries, it happens 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lcobucci here is my run scripts coditions. i use the crontable to check the process job active or not ,if it is not then pull up by crobtable
@chongchaoyu I think that @noname007's solution is simpler and seems to address the issue, we just need tests to confirm that the problem has been solved. With that said, should we close this PR in favour of #189? |
The cause of this issue:
When no new messages is available, the fetch request is also successful, and the message array in response is empty, but consumer also commits the offset. At this situation the consumer offset is equal to commit offset, so if you don't stop the consumer, the commit offset will be corrected by next successful fetch request, if you restart the consumer, the offset request will return the offset you committed last time.
Base on kafka protocol, highwaterMarkOffset represents "The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are.", so it is useless.