diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index 74ac1236..246b435b 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -91,9 +91,6 @@ public class DLedgerConfig { private long leadershipTransferWaitTimeout = 1000; - // Whether to force log cleanup when the disk reaches a certain space - private boolean enableCleanSpaceService = true; - public String getDefaultPath() { return storeBaseDir + File.separator + "dledger-" + selfId; } @@ -466,11 +463,4 @@ public Map getPeerAddressMap() { return this.peerAddressMap; } - public boolean isEnableCleanSpaceService() { - return enableCleanSpaceService; - } - - public void setEnableCleanSpaceService(boolean enableCleanSpaceService) { - this.enableCleanSpaceService = enableCleanSpaceService; - } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index a15ac8b6..a4a60471 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -81,6 +81,8 @@ public class DLedgerServer extends AbstractDLedgerServer { private ScheduledExecutorService executorService; private Optional fsmCaller; + private volatile boolean isStarted = false; + public DLedgerServer(DLedgerConfig dLedgerConfig) { this(dLedgerConfig, null, null, null); } @@ -110,6 +112,7 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC /** * Start in proxy mode, use shared DLedgerRpcService + * * @param dLedgerConfig DLedgerConfig * @param dLedgerRpcService Shared DLedgerRpcService */ @@ -133,28 +136,34 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe /** * Start up, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also start up it. */ - public void startup() { - this.dLedgerStore.startup(); - if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { - this.dLedgerRpcService.startup(); + public synchronized void startup() { + if (!isStarted) { + this.dLedgerStore.startup(); + if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { + this.dLedgerRpcService.startup(); + } + this.dLedgerEntryPusher.startup(); + this.dLedgerLeaderElector.startup(); + executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS); + isStarted = true; } - this.dLedgerEntryPusher.startup(); - this.dLedgerLeaderElector.startup(); - executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS); } /** * Shutdown, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also shut down it. */ - public void shutdown() { - this.dLedgerLeaderElector.shutdown(); - this.dLedgerEntryPusher.shutdown(); - if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { - this.dLedgerRpcService.shutdown(); + public synchronized void shutdown() { + if (isStarted) { + this.dLedgerLeaderElector.shutdown(); + this.dLedgerEntryPusher.shutdown(); + if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { + this.dLedgerRpcService.shutdown(); + } + this.dLedgerStore.shutdown(); + executorService.shutdown(); + this.fsmCaller.ifPresent(StateMachineCaller::shutdown); + isStarted = false; } - this.dLedgerStore.shutdown(); - executorService.shutdown(); - this.fsmCaller.ifPresent(StateMachineCaller::shutdown); } private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) { @@ -169,12 +178,18 @@ public MemberState getMemberState() { return memberState; } - public void registerStateMachine(final StateMachine fsm) { + public synchronized void registerStateMachine(final StateMachine fsm) { + if (isStarted) { + throw new IllegalStateException("can not register statemachine after dledger server starts"); + } final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher); fsmCaller.start(); this.fsmCaller = Optional.of(fsmCaller); // Register state machine caller to entry pusher this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller); + if (dLedgerStore instanceof DLedgerMmapFileStore) { + ((DLedgerMmapFileStore) dLedgerStore).setEnableCleanSpaceService(false); + } } public StateMachine getStateMachine() { @@ -215,10 +230,8 @@ public CompletableFuture handleVote(VoteRequest request) throws Ex } /** - * Handle the append requests: - * 1.append the entry to local store - * 2.submit the future to entry pusher and wait the quorum ack - * 3.if the pending requests are full, then reject it immediately + * Handle the append requests: 1.append the entry to local store 2.submit the future to entry pusher and wait the + * quorum ack 3.if the pending requests are full, then reject it immediately * * @param request * @return @@ -524,6 +537,7 @@ public NettyRemotingServer getRemotingServer() { } return null; } + @Override public NettyRemotingClient getRemotingClient() { if (this.dLedgerRpcService instanceof DLedgerRpcNettyService) { diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index b804c419..76a30077 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -72,6 +72,8 @@ public class DLedgerMmapFileStore extends DLedgerStore { private volatile Set fullStorePaths = Collections.emptySet(); + private boolean enableCleanSpaceService = true; + public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) { this.dLedgerConfig = dLedgerConfig; this.memberState = memberState; @@ -85,9 +87,7 @@ public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState localEntryBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4 * 1024 * 1024)); localIndexBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(INDEX_UNIT_SIZE * 2)); flushDataService = new FlushDataService("DLedgerFlushDataService", logger); - if (dLedgerConfig.isEnableCleanSpaceService()) { - cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger); - } + cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger); } @Override @@ -95,7 +95,7 @@ public void startup() { load(); recover(); flushDataService.start(); - if (cleanSpaceService != null) { + if (enableCleanSpaceService) { cleanSpaceService.start(); } } @@ -105,7 +105,7 @@ public void shutdown() { this.dataFileList.flush(0); this.indexFileList.flush(0); persistCheckPoint(); - if (cleanSpaceService != null) { + if (enableCleanSpaceService) { cleanSpaceService.shutdown(); } flushDataService.shutdown(); @@ -654,6 +654,10 @@ public void shutdownFlushService() { this.flushDataService.shutdown(); } + public void setEnableCleanSpaceService(boolean enableCleanSpaceService) { + this.enableCleanSpaceService = enableCleanSpaceService; + } + class FlushDataService extends ShutdownAbleThread { public FlushDataService(String name, Logger logger) {