Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4090]Fail faster to keep consistent state #4094

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
Expand Down Expand Up @@ -223,13 +224,14 @@ public DefaultLitePullConsumer(final String namespace, final String consumerGrou
}

@Override
public void start() throws MQClientException {
public synchronized void start() throws MQClientException {
checkServiceState();
setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) {
if (null != this.traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
this.traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
Expand Down Expand Up @@ -532,6 +534,15 @@ public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}

private void checkServiceState() throws MQClientException {
if (!this.defaultLitePullConsumerImpl.isCreateJust()) {
throw new MQClientException("The PullConsumer service state is not CREATE_JUST, maybe started once, "
+ this.defaultLitePullConsumerImpl.getServiceState()
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}

private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {

private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private ServiceState serviceState = ServiceState.CREATE_JUST;

protected MQClientInstance mQClientFactory;

Expand Down Expand Up @@ -271,6 +271,14 @@ public synchronized boolean isRunning() {
return this.serviceState == ServiceState.RUNNING;
}

public synchronized boolean isCreateJust() {
return this.serviceState == ServiceState.CREATE_JUST;
}

public synchronized ServiceState getServiceState() {
return this.serviceState;
}

Comment on lines +274 to +281
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceState is in a wrong/incompletable state before synchronized #start() return, since the value of serviceState is modified several times in synchronized #start(). synchronized in isCreateJust and getServiceState avoid read wrong/incompletable value.

synchronized #start()中对serviceState进行了多次更新、并且方法返回之前的状态是 不正确/不完整 的,synchronized 是为了避免读取到这些 不正确/不完整 的状态。而且我注意到在该pr之前,serviceState的修改和读取也都使用了synchronized加锁、这已经可以保证serviceState的可见性了,因此serviceState上的volatile修饰符似乎是多余的。

public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
Expand Down