Skip to content

Commit

Permalink
Support DLedger for Snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Sep 1, 2022
1 parent 584c2de commit ab9e2ad
Show file tree
Hide file tree
Showing 32 changed files with 1,535 additions and 64 deletions.
23 changes: 23 additions & 0 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class DLedgerConfig {

private long leadershipTransferWaitTimeout = 1000;

private int snapshotThreshold = 1000;
private int maxSnapshotReservedNum = 3;

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand All @@ -116,6 +119,10 @@ public void setDataStorePath(String dataStorePath) {
this.dataStorePath = dataStorePath;
}

public String getSnapshotStoreBaseDir() {
return getDefaultPath() + File.separator + "snapshot";
}

public String getIndexStorePath() {
return getDefaultPath() + File.separator + "index";
}
Expand Down Expand Up @@ -472,4 +479,20 @@ public String getSelfAddress() {
public Map<String, String> getPeerAddressMap() {
return this.peerAddressMap;
}

public int getSnapshotThreshold() {
return snapshotThreshold;
}

public void setSnapshotThreshold(int snapshotThreshold) {
this.snapshotThreshold = snapshotThreshold;
}

public int getMaxSnapshotReservedNum() {
return maxSnapshotReservedNum;
}

public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) {
this.maxSnapshotReservedNum = maxSnapshotReservedNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotManager;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
Expand Down Expand Up @@ -100,6 +101,11 @@ public void registerDLedgerRpcService(DLedgerRpcService dLedgerRpcService) {

public void startup() {
this.dLedgerStore.startup();
this.fsmCaller.ifPresent(x -> {
// Start state machine caller and load existing snapshots for data recovery
x.start();
x.getSnapshotManager().loadSnapshot();
});
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -127,10 +133,13 @@ public MemberState getMemberState() {

public void registerStateMachine(final StateMachine fsm) {
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.start();
fsmCaller.registerSnapshotManager(new FileSnapshotManager(fsmCaller, this.dLedgerConfig));
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
if (this.dLedgerConfig.getStoreType().equals(DLedgerConfig.FILE)) {
dLedgerStore.setEnableStateMachine(true);
}
}

public StateMachine getStateMachine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public enum DLedgerResponseCode {
LEADER_PENDING_FULL(503, ""),
ILLEGAL_MEMBER_STATE(504, ""),
LEADER_NOT_READY(505, ""),
LEADER_TRANSFERRING(506, "");
LEADER_TRANSFERRING(506, ""),
LOAD_SNAPSHOT_ERROR(507, "");

private static Map<Integer, DLedgerResponseCode> codeMap = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

public class SnapshotFile {

public static final String SNAPSHOT_META_FILE = "snapshot_meta";
public static final String SNAPSHOT_DATA_FILE = "data";
public static final String SNAPSHOT_DIR_PREFIX = "snapshot_";
public static final String SNAPSHOT_TEMP_DIR = "tmp";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.store.DLedgerStore;

public interface SnapshotManager {

void saveSnapshot(DLedgerStore dLedgerStore, DLedgerEntry dLedgerEntry);

void loadSnapshot();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

public class SnapshotMeta {

private long lastIncludedIndex;
private long lastIncludedTerm;

public SnapshotMeta(long lastIncludedIndex, long lastIncludedTerm) {
this.lastIncludedIndex = lastIncludedIndex;
this.lastIncludedTerm = lastIncludedTerm;
}

public long getLastIncludedIndex() {
return lastIncludedIndex;
}

public void setLastIncludedIndex(int lastIncludedIndex) {
this.lastIncludedIndex = lastIncludedIndex;
}

public long getLastIncludedTerm() {
return lastIncludedTerm;
}

public void setLastIncludedTerm(int lastIncludedTerm) {
this.lastIncludedTerm = lastIncludedTerm;
}

@Override
public String toString() {
return "SnapshotMeta{" +
"lastIncludedIndex=" + lastIncludedIndex +
", lastIncludedTerm=" + lastIncludedTerm +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@

package io.openmessaging.storage.dledger.snapshot;

import java.io.IOException;

/**
* Reader for snapshot
*/
public interface SnapshotReader {
public abstract class SnapshotReader {

public abstract boolean open();

public abstract SnapshotMeta load(String snapshotMetaStorePath) throws IOException;

public abstract SnapshotMeta getSnapshotMeta();

public abstract String getSnapshotStorePath();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

import java.util.HashMap;
import java.util.Map;

public enum SnapshotStatus {

UNKNOWN(-1),
SUCCESS(0),
FAIL(10001);

private static Map<Integer, SnapshotStatus> codeMap = new HashMap<>();

static {
for (SnapshotStatus status : SnapshotStatus.values()) {
codeMap.put(status.code, status);
}
}

private int code;

SnapshotStatus(int code) {
this.code = code;
}

public static SnapshotStatus valueOf(int code) {
SnapshotStatus tmp = codeMap.get(code);
if (tmp != null) {
return tmp;
} else {
return UNKNOWN;
}
}

public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.snapshot;

public interface SnapshotStore {

SnapshotWriter openSnapshotWriter();

SnapshotReader openSnapshotReader();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,34 @@

package io.openmessaging.storage.dledger.snapshot;

import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotMetaProcessor;

import java.io.IOException;

/**
* Writer for snapshot
*/
public interface SnapshotWriter {
public abstract class SnapshotWriter {

private SnapshotMeta snapshotMeta;

public SnapshotMeta getSnapshotMeta() {
return snapshotMeta;
}

public void setSnapshotMeta(SnapshotMeta snapshotMeta) {
this.snapshotMeta = snapshotMeta;
}

public abstract String getSnapshotStorePath();

public abstract FileSnapshotMetaProcessor getMetaProcessor();

public abstract boolean open();

public abstract void save(SnapshotStatus status) throws IOException;

public abstract void sync() throws IOException;

public abstract void updateMeta(SnapshotMeta snapshotMeta);
}
Loading

0 comments on commit ab9e2ad

Please sign in to comment.