Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #236] Do not enable clean space service after registering the state machine #237

Merged
merged 2 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ public class DLedgerConfig {

private long leadershipTransferWaitTimeout = 1000;

// Whether to force log cleanup when the disk reaches a certain space
private boolean enableCleanSpaceService = true;

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -466,11 +463,4 @@ public Map<String, String> getPeerAddressMap() {
return this.peerAddressMap;
}

public boolean isEnableCleanSpaceService() {
return enableCleanSpaceService;
}

public void setEnableCleanSpaceService(boolean enableCleanSpaceService) {
this.enableCleanSpaceService = enableCleanSpaceService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class DLedgerServer extends AbstractDLedgerServer {
private ScheduledExecutorService executorService;
private Optional<StateMachineCaller> fsmCaller;

private volatile boolean isStarted = false;

public DLedgerServer(DLedgerConfig dLedgerConfig) {
this(dLedgerConfig, null, null, null);
}
Expand Down Expand Up @@ -110,6 +112,7 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC

/**
* Start in proxy mode, use shared DLedgerRpcService
*
* @param dLedgerConfig DLedgerConfig
* @param dLedgerRpcService Shared DLedgerRpcService
*/
Expand All @@ -133,28 +136,34 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe
/**
* Start up, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also start up it.
*/
public void startup() {
this.dLedgerStore.startup();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
public synchronized void startup() {
if (!isStarted) {
this.dLedgerStore.startup();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
}
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
isStarted = true;
}
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
}

/**
* Shutdown, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also shut down it.
*/
public void shutdown() {
this.dLedgerLeaderElector.shutdown();
this.dLedgerEntryPusher.shutdown();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.shutdown();
public synchronized void shutdown() {
if (isStarted) {
this.dLedgerLeaderElector.shutdown();
this.dLedgerEntryPusher.shutdown();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.shutdown();
}
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
isStarted = false;
}
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
}

private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
Expand All @@ -169,12 +178,18 @@ public MemberState getMemberState() {
return memberState;
}

public void registerStateMachine(final StateMachine fsm) {
public synchronized void registerStateMachine(final StateMachine fsm) {
if (isStarted) {
throw new IllegalStateException("can not register statemachine after dledger server starts");
}
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.start();
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
if (dLedgerStore instanceof DLedgerMmapFileStore) {
((DLedgerMmapFileStore) dLedgerStore).setEnableCleanSpaceService(false);
}
}

public StateMachine getStateMachine() {
Expand Down Expand Up @@ -215,10 +230,8 @@ public CompletableFuture<VoteResponse> handleVote(VoteRequest request) throws Ex
}

/**
* Handle the append requests:
* 1.append the entry to local store
* 2.submit the future to entry pusher and wait the quorum ack
* 3.if the pending requests are full, then reject it immediately
* Handle the append requests: 1.append the entry to local store 2.submit the future to entry pusher and wait the
* quorum ack 3.if the pending requests are full, then reject it immediately
*
* @param request
* @return
Expand Down Expand Up @@ -524,6 +537,7 @@ public NettyRemotingServer getRemotingServer() {
}
return null;
}

@Override
public NettyRemotingClient getRemotingClient() {
if (this.dLedgerRpcService instanceof DLedgerRpcNettyService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class DLedgerMmapFileStore extends DLedgerStore {

private volatile Set<String> fullStorePaths = Collections.emptySet();

private boolean enableCleanSpaceService = true;

public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
Expand All @@ -85,17 +87,15 @@ public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState
localEntryBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4 * 1024 * 1024));
localIndexBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(INDEX_UNIT_SIZE * 2));
flushDataService = new FlushDataService("DLedgerFlushDataService", logger);
if (dLedgerConfig.isEnableCleanSpaceService()) {
cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger);
}
cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger);
}

@Override
public void startup() {
load();
recover();
flushDataService.start();
if (cleanSpaceService != null) {
if (enableCleanSpaceService) {
cleanSpaceService.start();
}
}
Expand All @@ -105,7 +105,7 @@ public void shutdown() {
this.dataFileList.flush(0);
this.indexFileList.flush(0);
persistCheckPoint();
if (cleanSpaceService != null) {
if (enableCleanSpaceService) {
cleanSpaceService.shutdown();
}
flushDataService.shutdown();
Expand Down Expand Up @@ -654,6 +654,10 @@ public void shutdownFlushService() {
this.flushDataService.shutdown();
}

public void setEnableCleanSpaceService(boolean enableCleanSpaceService) {
this.enableCleanSpaceService = enableCleanSpaceService;
}

class FlushDataService extends ShutdownAbleThread {

public FlushDataService(String name, Logger logger) {
Expand Down