Skip to content

Commit

Permalink
[ISSUE #236] Do not enable clean space service after registering the …
Browse files Browse the repository at this point in the history
…state machine (#237)

* Do not enable clean space service  after registering the state machine

* Add synchronized to shutdown function
  • Loading branch information
RongtongJin authored Sep 15, 2022
1 parent 505f438 commit 5fa83c0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ public class DLedgerConfig {

private long leadershipTransferWaitTimeout = 1000;

// Whether to force log cleanup when the disk reaches a certain space
private boolean enableCleanSpaceService = true;

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -466,11 +463,4 @@ public Map<String, String> getPeerAddressMap() {
return this.peerAddressMap;
}

public boolean isEnableCleanSpaceService() {
return enableCleanSpaceService;
}

public void setEnableCleanSpaceService(boolean enableCleanSpaceService) {
this.enableCleanSpaceService = enableCleanSpaceService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class DLedgerServer extends AbstractDLedgerServer {
private ScheduledExecutorService executorService;
private Optional<StateMachineCaller> fsmCaller;

private volatile boolean isStarted = false;

public DLedgerServer(DLedgerConfig dLedgerConfig) {
this(dLedgerConfig, null, null, null);
}
Expand Down Expand Up @@ -110,6 +112,7 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC

/**
* Start in proxy mode, use shared DLedgerRpcService
*
* @param dLedgerConfig DLedgerConfig
* @param dLedgerRpcService Shared DLedgerRpcService
*/
Expand All @@ -133,28 +136,34 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe
/**
* Start up, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also start up it.
*/
public void startup() {
this.dLedgerStore.startup();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
public synchronized void startup() {
if (!isStarted) {
this.dLedgerStore.startup();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
}
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
isStarted = true;
}
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
}

/**
* Shutdown, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also shut down it.
*/
public void shutdown() {
this.dLedgerLeaderElector.shutdown();
this.dLedgerEntryPusher.shutdown();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.shutdown();
public synchronized void shutdown() {
if (isStarted) {
this.dLedgerLeaderElector.shutdown();
this.dLedgerEntryPusher.shutdown();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.shutdown();
}
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
isStarted = false;
}
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
}

private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
Expand All @@ -169,12 +178,18 @@ public MemberState getMemberState() {
return memberState;
}

public void registerStateMachine(final StateMachine fsm) {
public synchronized void registerStateMachine(final StateMachine fsm) {
if (isStarted) {
throw new IllegalStateException("can not register statemachine after dledger server starts");
}
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.start();
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
if (dLedgerStore instanceof DLedgerMmapFileStore) {
((DLedgerMmapFileStore) dLedgerStore).setEnableCleanSpaceService(false);
}
}

public StateMachine getStateMachine() {
Expand Down Expand Up @@ -215,10 +230,8 @@ public CompletableFuture<VoteResponse> handleVote(VoteRequest request) throws Ex
}

/**
* Handle the append requests:
* 1.append the entry to local store
* 2.submit the future to entry pusher and wait the quorum ack
* 3.if the pending requests are full, then reject it immediately
* Handle the append requests: 1.append the entry to local store 2.submit the future to entry pusher and wait the
* quorum ack 3.if the pending requests are full, then reject it immediately
*
* @param request
* @return
Expand Down Expand Up @@ -524,6 +537,7 @@ public NettyRemotingServer getRemotingServer() {
}
return null;
}

@Override
public NettyRemotingClient getRemotingClient() {
if (this.dLedgerRpcService instanceof DLedgerRpcNettyService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class DLedgerMmapFileStore extends DLedgerStore {

private volatile Set<String> fullStorePaths = Collections.emptySet();

private boolean enableCleanSpaceService = true;

public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
Expand All @@ -85,17 +87,15 @@ public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState
localEntryBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4 * 1024 * 1024));
localIndexBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(INDEX_UNIT_SIZE * 2));
flushDataService = new FlushDataService("DLedgerFlushDataService", logger);
if (dLedgerConfig.isEnableCleanSpaceService()) {
cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger);
}
cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger);
}

@Override
public void startup() {
load();
recover();
flushDataService.start();
if (cleanSpaceService != null) {
if (enableCleanSpaceService) {
cleanSpaceService.start();
}
}
Expand All @@ -105,7 +105,7 @@ public void shutdown() {
this.dataFileList.flush(0);
this.indexFileList.flush(0);
persistCheckPoint();
if (cleanSpaceService != null) {
if (enableCleanSpaceService) {
cleanSpaceService.shutdown();
}
flushDataService.shutdown();
Expand Down Expand Up @@ -654,6 +654,10 @@ public void shutdownFlushService() {
this.flushDataService.shutdown();
}

public void setEnableCleanSpaceService(boolean enableCleanSpaceService) {
this.enableCleanSpaceService = enableCleanSpaceService;
}

class FlushDataService extends ShutdownAbleThread {

public FlushDataService(String name, Logger logger) {
Expand Down

0 comments on commit 5fa83c0

Please sign in to comment.