T deserialize(final byte[] serializeData) {
+ return HESSIAN2.deserialize(serializeData);
+ }
+}
diff --git a/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/MetaDataDTO.java b/klein-serializer/klein-serializer-hessian2/src/main/java/com/ofcoder/klein/serializer/hessian2/KleinHessian2Output.java
similarity index 64%
rename from klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/MetaDataDTO.java
rename to klein-serializer/klein-serializer-hessian2/src/main/java/com/ofcoder/klein/serializer/hessian2/KleinHessian2Output.java
index 02855f2d..72cbd458 100644
--- a/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/MetaDataDTO.java
+++ b/klein-serializer/klein-serializer-hessian2/src/main/java/com/ofcoder/klein/serializer/hessian2/KleinHessian2Output.java
@@ -14,30 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.ofcoder.klein.storage.file;
+package com.ofcoder.klein.serializer.hessian2;
-import com.ofcoder.klein.storage.facade.LogManager;
+import com.caucho.hessian.io.Hessian2Output;
+import java.io.OutputStream;
/**
- * @author 释慧利
+ * implements AutoCloseable for Hessian2Output.
*/
-public class MetaDataDTO implements LogManager.MetaData {
- private int id;
- private String name;
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
+public class KleinHessian2Output extends Hessian2Output implements AutoCloseable {
+ public KleinHessian2Output(final OutputStream os) {
+ super(os);
}
}
diff --git a/klein-serializer/klein-serializer-hessian2/src/main/resources/META-INF/services/com.ofcoder.klein.serializer.Serializer b/klein-serializer/klein-serializer-hessian2/src/main/resources/META-INF/services/com.ofcoder.klein.serializer.Serializer
new file mode 100644
index 00000000..e45af563
--- /dev/null
+++ b/klein-serializer/klein-serializer-hessian2/src/main/resources/META-INF/services/com.ofcoder.klein.serializer.Serializer
@@ -0,0 +1 @@
+hessian2=com.ofcoder.klein.serializer.hessian2.Hessian2Serializer
\ No newline at end of file
diff --git a/klein-serializer/klein-serializer-hessian2/src/test/java/com/ofcoder/klein/serializer/hessian2/Hessian2UtilTest.java b/klein-serializer/klein-serializer-hessian2/src/test/java/com/ofcoder/klein/serializer/hessian2/Hessian2UtilTest.java
new file mode 100644
index 00000000..71dae5e8
--- /dev/null
+++ b/klein-serializer/klein-serializer-hessian2/src/test/java/com/ofcoder/klein/serializer/hessian2/Hessian2UtilTest.java
@@ -0,0 +1,20 @@
+package com.ofcoder.klein.serializer.hessian2;
+
+import com.ofcoder.klein.serializer.Serializer;
+import com.ofcoder.klein.spi.ExtensionLoader;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+public class Hessian2UtilTest extends TestCase {
+
+ public void testSerialize() {
+ Serializer hessian2 = ExtensionLoader.getExtensionLoader(Serializer.class).register("hessian2");
+
+ String resource = "Hello Klein";
+ byte[] serialize = hessian2.serialize(resource);
+ Assert.assertNotNull(serialize);
+ String deserialize = (String) hessian2.deserialize(serialize);
+ Assert.assertEquals(deserialize, resource);
+ }
+}
\ No newline at end of file
diff --git a/klein-serializer/klein-serializer-protobuf/pom.xml b/klein-serializer/klein-serializer-protobuf/pom.xml
new file mode 100644
index 00000000..76bf6c62
--- /dev/null
+++ b/klein-serializer/klein-serializer-protobuf/pom.xml
@@ -0,0 +1,23 @@
+
+
+ 4.0.0
+
+ com.ofcoder.klein.serializer
+ klein-serializer
+ 0.0.8
+
+
+ klein-serializer-protobuf
+ com.ofcoder.klein.serializer.protobuf
+ 0.0.8
+ jar
+
+
+ 8
+ 8
+ UTF-8
+
+
+
\ No newline at end of file
diff --git a/klein-serializer/pom.xml b/klein-serializer/pom.xml
new file mode 100644
index 00000000..951bbc0b
--- /dev/null
+++ b/klein-serializer/pom.xml
@@ -0,0 +1,39 @@
+
+
+ 4.0.0
+
+ com.ofcoder.klein
+ klein
+ 0.0.8
+
+
+ klein-serializer
+ 0.0.8
+ com.ofcoder.klein.serializer
+
+ pom
+
+ klein-serializer-protobuf
+ klein-serializer-hessian2
+ klein-serializer-facade
+
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+ com.ofcoder.klein.common
+ klein-common
+
+
+ com.ofcoder.klein.spi
+ klein-spi
+
+
+
\ No newline at end of file
diff --git a/klein-storage/klein-storage-facade/pom.xml b/klein-storage/klein-storage-facade/pom.xml
index 06eeda5a..67c03b1a 100644
--- a/klein-storage/klein-storage-facade/pom.xml
+++ b/klein-storage/klein-storage-facade/pom.xml
@@ -20,4 +20,12 @@
UTF-8
+
+
+
+ com.ofcoder.klein.serializer.facade
+ klein-serializer-facade
+
+
+
\ No newline at end of file
diff --git a/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/LogManager.java b/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/LogManager.java
index 045d4681..92055445 100644
--- a/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/LogManager.java
+++ b/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/LogManager.java
@@ -16,12 +16,11 @@
*/
package com.ofcoder.klein.storage.facade;
+import com.ofcoder.klein.spi.SPI;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.ofcoder.klein.spi.SPI;
-
/**
* Manage logs generated by the consensus system.
*
diff --git a/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/Snap.java b/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/Snap.java
index aa6db3a2..9e0ec899 100644
--- a/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/Snap.java
+++ b/klein-storage/klein-storage-facade/src/main/java/com/ofcoder/klein/storage/facade/Snap.java
@@ -25,12 +25,12 @@
*/
public class Snap implements Serializable {
private long checkpoint;
- private Object snap;
+ private byte[] snap;
public Snap() {
}
- public Snap(final long checkpoint, final Object snap) {
+ public Snap(final long checkpoint, final byte[] snap) {
this.checkpoint = checkpoint;
this.snap = snap;
}
@@ -59,8 +59,8 @@ public void setCheckpoint(final long checkpoint) {
*
* @return snapshot
*/
- public Object getSnap() {
- return snap;
+ public byte[] getSnap() {
+ return snap.clone();
}
/**
@@ -68,7 +68,7 @@ public Object getSnap() {
*
* @param snap snapshot
*/
- public void setSnap(final Object snap) {
- this.snap = snap;
+ public void setSnap(final byte[] snap) {
+ this.snap = snap.clone();
}
}
diff --git a/klein-storage/klein-storage-file/src/main/java/com/ofcoder/klein/storage/file/FileLogManager.java b/klein-storage/klein-storage-file/src/main/java/com/ofcoder/klein/storage/file/FileLogManager.java
index b4e54cee..9c655c88 100644
--- a/klein-storage/klein-storage-file/src/main/java/com/ofcoder/klein/storage/file/FileLogManager.java
+++ b/klein-storage/klein-storage-file/src/main/java/com/ofcoder/klein/storage/file/FileLogManager.java
@@ -16,6 +16,20 @@
*/
package com.ofcoder.klein.storage.file;
+import com.ofcoder.klein.common.util.StreamUtil;
+import com.ofcoder.klein.serializer.Serializer;
+import com.ofcoder.klein.spi.ExtensionLoader;
+import com.ofcoder.klein.spi.Join;
+import com.ofcoder.klein.storage.facade.Instance;
+import com.ofcoder.klein.storage.facade.LogManager;
+import com.ofcoder.klein.storage.facade.Snap;
+import com.ofcoder.klein.storage.facade.config.StorageProp;
+import com.ofcoder.klein.storage.facade.exception.LockException;
+import com.ofcoder.klein.storage.facade.exception.StorageException;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -29,20 +43,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ofcoder.klein.common.serialization.Hessian2Util;
-import com.ofcoder.klein.common.util.StreamUtil;
-import com.ofcoder.klein.spi.Join;
-import com.ofcoder.klein.storage.facade.Instance;
-import com.ofcoder.klein.storage.facade.LogManager;
-import com.ofcoder.klein.storage.facade.Snap;
-import com.ofcoder.klein.storage.facade.config.StorageProp;
-import com.ofcoder.klein.storage.facade.exception.LockException;
-import com.ofcoder.klein.storage.facade.exception.StorageException;
-
/**
* Jvm LogManager.
*
@@ -60,8 +60,10 @@ public class FileLogManager implements LogManager
{
private ConcurrentMap locks = new ConcurrentHashMap<>();
private MetaData metadata;
+ private final Serializer serializer;
public FileLogManager(final StorageProp op) {
+ this.serializer = ExtensionLoader.getExtensionLoader(Serializer.class).register("hessian2");
runningInstances = new ConcurrentHashMap<>();
confirmedInstances = new ConcurrentHashMap<>();
@@ -69,7 +71,7 @@ public FileLogManager(final StorageProp op) {
selfPath = op.getDataPath();
File selfFile = new File(selfPath);
if (!selfFile.exists()) {
- boolean mkdir = selfFile.mkdirs();
+ boolean ignored = selfFile.mkdirs();
// do nothing for mkdir result
}
@@ -132,15 +134,12 @@ public MetaData loadMetaData(final MetaData defaultValue) {
this.metadata = defaultValue;
return this.metadata;
}
- FileInputStream lastIn = null;
- try {
- lastIn = new FileInputStream(file);
- this.metadata = Hessian2Util.deserialize(IOUtils.toByteArray(lastIn));
+
+ try (FileInputStream lastIn = new FileInputStream(file);) {
+ this.metadata = serializer.deserialize(IOUtils.toByteArray(lastIn));
return this.metadata;
} catch (IOException e) {
throw new StorageException("loadMetaData, " + e.getMessage(), e);
- } finally {
- StreamUtil.close(lastIn);
}
}
@@ -148,7 +147,7 @@ private void saveMetaData() {
FileOutputStream mateOut = null;
try {
mateOut = new FileOutputStream(metaPath);
- IOUtils.write(Hessian2Util.serialize(this.metadata), mateOut);
+ IOUtils.write(serializer.serialize(this.metadata), mateOut);
} catch (IOException e) {
throw new StorageException("save snap, " + e.getMessage(), e);
} finally {
@@ -158,6 +157,7 @@ private void saveMetaData() {
@Override
public void saveSnap(final String group, final Snap snap) {
+ LOG.debug("save snap, group: {}, checkpoint: {}", group, snap.getCheckpoint());
String bastPath = selfPath + File.separator + group + File.separator;
File snapFile = new File(bastPath + snap.getCheckpoint());
if (snapFile.exists()) {
@@ -165,7 +165,7 @@ public void saveSnap(final String group, final Snap snap) {
}
File baseDir = new File(bastPath);
if (!baseDir.exists()) {
- boolean mkdir = baseDir.mkdirs();
+ boolean ignored = baseDir.mkdirs();
}
File lastFile = new File(bastPath + "last");
@@ -175,8 +175,8 @@ public void saveSnap(final String group, final Snap snap) {
try {
lastOut = new FileOutputStream(lastFile);
snapOut = new FileOutputStream(snapFile);
- IOUtils.write(Hessian2Util.serialize(snap), snapOut);
- IOUtils.write(Hessian2Util.serialize(snapFile.getPath()), lastOut);
+ IOUtils.write(serializer.serialize(snap), snapOut);
+ IOUtils.write(serializer.serialize(snapFile.getPath()), lastOut);
} catch (IOException e) {
throw new StorageException("save snap, " + e.getMessage(), e);
} finally {
@@ -206,9 +206,9 @@ public Snap getLastSnap(final String group) {
FileInputStream snapIn = null;
try {
lastIn = new FileInputStream(file);
- String deserialize = Hessian2Util.deserialize(IOUtils.toByteArray(lastIn));
+ String deserialize = serializer.deserialize(IOUtils.toByteArray(lastIn));
snapIn = new FileInputStream(deserialize);
- lastSnap = Hessian2Util.deserialize(IOUtils.toByteArray(snapIn));
+ lastSnap = serializer.deserialize(IOUtils.toByteArray(snapIn));
return lastSnap;
} catch (IOException e) {
throw new StorageException("get last snap, " + e.getMessage(), e);
diff --git a/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/FileLogManagerTest.java b/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/FileLogManagerTest.java
index a6871362..0ce79b7a 100644
--- a/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/FileLogManagerTest.java
+++ b/klein-storage/klein-storage-file/src/test/java/com/ofcoder/klein/storage/file/FileLogManagerTest.java
@@ -1,31 +1,30 @@
package com.ofcoder.klein.storage.file;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
import com.google.common.collect.Lists;
import com.ofcoder.klein.spi.ExtensionLoader;
import com.ofcoder.klein.storage.facade.Instance;
import com.ofcoder.klein.storage.facade.LogManager;
+import com.ofcoder.klein.storage.facade.Snap;
import com.ofcoder.klein.storage.facade.config.StorageProp;
import com.ofcoder.klein.storage.facade.exception.LockException;
+import java.util.List;
+import java.util.Random;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
public class FileLogManagerTest {
- LogManager join;
+ LogManager logManager;
@Before
public void setUp() {
- join = ExtensionLoader.getExtensionLoader(LogManager.class).register("file", new StorageProp());
+ logManager = ExtensionLoader.getExtensionLoader(LogManager.class).register("file", new StorageProp());
}
@After
public void shutdown() {
- join.shutdown();
+ logManager.shutdown();
}
@Test(expected = LockException.class)
@@ -35,12 +34,12 @@ public void testUpdateInstance_noLock() {
instance.setInstanceId(1);
instance.setState(Instance.State.PREPARED);
instance.setGrantedValue(Lists.newArrayList("Zzz"));
- join.updateInstance(instance);
+ logManager.updateInstance(instance);
}
@Test()
public void testGetInstance() {
- Instance nil = join.getInstance(1);
+ Instance nil = logManager.getInstance(1);
Assert.assertNull(nil);
Instance instance = new Instance<>();
@@ -49,11 +48,11 @@ public void testGetInstance() {
instance.setState(Instance.State.PREPARED);
instance.setGrantedValue(Lists.newArrayList("Zzz"));
- join.getLock(instance.getInstanceId()).writeLock().lock();
- join.updateInstance(instance);
- join.getLock(instance.getInstanceId()).writeLock().unlock();
+ logManager.getLock(instance.getInstanceId()).writeLock().lock();
+ logManager.updateInstance(instance);
+ logManager.getLock(instance.getInstanceId()).writeLock().unlock();
- Instance actual = join.getInstance(1);
+ Instance actual = logManager.getInstance(1);
Assert.assertNotNull(actual);
@@ -78,14 +77,14 @@ public void testGetInstanceNoConfirm() {
instance2.setState(Instance.State.CONFIRMED);
instance2.setGrantedValue(Lists.newArrayList("Zzz"));
- join.getLock(instance1.getInstanceId()).writeLock().lock();
- join.updateInstance(instance1);
- join.getLock(instance1.getInstanceId()).writeLock().unlock();
- join.getLock(instance2.getInstanceId()).writeLock().lock();
- join.updateInstance(instance2);
- join.getLock(instance2.getInstanceId()).writeLock().unlock();
+ logManager.getLock(instance1.getInstanceId()).writeLock().lock();
+ logManager.updateInstance(instance1);
+ logManager.getLock(instance1.getInstanceId()).writeLock().unlock();
+ logManager.getLock(instance2.getInstanceId()).writeLock().lock();
+ logManager.updateInstance(instance2);
+ logManager.getLock(instance2.getInstanceId()).writeLock().unlock();
- List instanceNoConfirm = join.getInstanceNoConfirm();
+ List instanceNoConfirm = logManager.getInstanceNoConfirm();
Assert.assertNotNull(instanceNoConfirm);
Assert.assertEquals(1, instanceNoConfirm.size());
Instance actual = (Instance) instanceNoConfirm.get(0);
@@ -97,7 +96,20 @@ public void testGetInstanceNoConfirm() {
}
@Test
- public void testLoadMetaData(){
+ public void testSaveSnapAndGetLastSnap() {
+ Snap snap = new Snap(new Random().nextLong(), new byte[] {0x1, 0x13, 0x12, 0x3, 0x3});
+ Snap snap2 = new Snap(new Random().nextLong(), new byte[] {0x1, 0x13, 0x12, 0x3, 0x2});
+ String group = "Test";
+ logManager.saveSnap(group, snap);
+ Snap lastSnap = logManager.getLastSnap(group);
+ Assert.assertEquals(snap, lastSnap);
+
+ logManager.saveSnap(group, snap2);
+ lastSnap = logManager.getLastSnap(group);
+
+ Assert.assertEquals(snap2, lastSnap);
+ Assert.assertNotEquals(snap, lastSnap);
}
+
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 96ed394b..d6833458 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,6 +22,7 @@
klein-common
klein-example
klein-jepsen
+ klein-serializer
@@ -99,6 +100,16 @@
klein-storage-facade
${project.version}