Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support MemoryManagement for graph query framework #2649

Merged
merged 51 commits into from
Nov 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7a652be
feat: framework for memoryManagement
Pengzna Aug 25, 2024
ccc918b
wip: memory pool and manager framework
Pengzna Aug 25, 2024
b266358
wip: memory allocation
Pengzna Oct 8, 2024
ff10c3a
wip: memory reclaim
Pengzna Oct 9, 2024
26b0cfe
rename class
Pengzna Oct 9, 2024
f6aeace
fix review
Pengzna Oct 9, 2024
72e5bf3
remove useless allocator
Pengzna Oct 9, 2024
48f4817
netty allocator
Pengzna Oct 10, 2024
d906d04
revert config.properties
Pengzna Oct 13, 2024
b308be0
fix and improvement for allocation and deallocation
Pengzna Oct 22, 2024
09367a1
Merge remote-tracking branch 'refs/remotes/base/master' into memory/m…
Pengzna Oct 22, 2024
ea9a459
move monitor
Pengzna Oct 22, 2024
f552fd2
Revert "move monitor"
Pengzna Oct 22, 2024
8a2c65c
improve memory arbitration
Pengzna Oct 22, 2024
c37f869
suspend query when arbitration & kill query when OOM
Pengzna Oct 23, 2024
5904909
fix review
Pengzna Oct 23, 2024
0e70e44
fury test
Pengzna Oct 23, 2024
5d71541
offHeap magic
Pengzna Oct 23, 2024
f73f0ab
Revert "fury test"
Pengzna Oct 23, 2024
871015e
offHeap magic util
Pengzna Oct 23, 2024
8344443
complete adoption for all id
Pengzna Oct 25, 2024
d9cf408
complete property adoption
Pengzna Oct 26, 2024
aaeacb5
release ByteBuf off heap memory block
Pengzna Oct 26, 2024
ef0d629
complete allocate memory test and fix bug
Pengzna Oct 27, 2024
54d1fd8
fix some bugs: arbitration & suspend
Pengzna Oct 27, 2024
bfe75c0
complete OOM UT and fix bugs
Pengzna Oct 27, 2024
ab1bcde
complete memory management framework UT and fix all bugs
Pengzna Oct 27, 2024
91df57a
fix ut
Pengzna Oct 27, 2024
52ca7af
keep format consistent with original version
Pengzna Oct 28, 2024
ee8e125
fix review
Pengzna Oct 28, 2024
1a7d461
Merge branch 'master' into memory/management
imbajin Oct 28, 2024
de9d7a1
wip: adoption to query chain & introduce factory
Pengzna Oct 28, 2024
46066eb
fix concurrent bug when local arbitrate
Pengzna Oct 29, 2024
4f1e966
add comments
Pengzna Oct 29, 2024
7be5069
Merge remote-tracking branch 'origin/memory/management' into memory/m…
Pengzna Oct 29, 2024
231b647
feat: off-heap object factory
Pengzna Oct 29, 2024
865f1fb
fix gc child bugs and add consumer test
Pengzna Oct 29, 2024
f34e233
fix deallocate netty memory block bug & add complexId test
Pengzna Oct 29, 2024
879390b
complete all ut
Pengzna Oct 29, 2024
a96e9ee
fix all bugs
Pengzna Oct 29, 2024
7c86e84
dependency
Pengzna Oct 29, 2024
5af2cb9
add comments
Pengzna Oct 29, 2024
5e47bb0
add private constructor for singleton
Pengzna Oct 29, 2024
b77346b
add memory management config
Pengzna Oct 29, 2024
d00a8df
improve robustness
Pengzna Oct 29, 2024
fecc909
remove duplicate
Pengzna Oct 29, 2024
31f1feb
improve condition usage
Pengzna Oct 29, 2024
d4035bd
improve log
Pengzna Oct 29, 2024
d87388b
fix memory conservation bug
Pengzna Oct 29, 2024
d25396d
Revert "dependency"
Pengzna Nov 4, 2024
b334c61
revert duplicate known-dependencies.txt under huge-common
Pengzna Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
complete all ut
Pengzna committed Oct 29, 2024

Verified

This commit was signed with the committer’s verified signature.
auguste-probabl Auguste Baum
commit 879390b1b5252fe74514ad269677f12a1f2b4f19
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ public interface OffHeapObject {

/**
* This method will read from off-heap ByteBuf storing binary data of self.
* Note: need class cast by hand when using.
*
* @return self value
*/
Original file line number Diff line number Diff line change
@@ -118,12 +118,12 @@ public EdgeIdOffHeap(Id ownerVertexId,
@Override
public Object zeroCopyReadFromByteBuf() {
try {
return new EdgeId((HugeVertex) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf(),
return new EdgeId((Id) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf(),
this.direction,
(Id) this.edgeLabelIdOffHeap.zeroCopyReadFromByteBuf(),
(Id) this.subLabelIdOffHeap.zeroCopyReadFromByteBuf(),
this.sortValuesOffHeap.toString(StandardCharsets.UTF_8),
(HugeVertex) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf());
(Id) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf());
} finally {
this.sortValuesOffHeap.resetReaderIndex();
}
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ public long tryToReclaimLocalMemory(long neededBytes, MemoryPool requestingPool)
}

private long reclaimChildren(long neededBytes, MemoryPool requestingPool) {
LOG.info("[{}] tryToReclaimLocalMemory: neededBytes={}", this, neededBytes);
LOG.debug("[{}] tryToReclaimLocalMemory: neededBytes={}", this, neededBytes);
this.isBeingArbitrated.set(true);
long totalReclaimedBytes = 0;
long currentNeededBytes = neededBytes;
@@ -124,7 +124,7 @@ public void releaseSelf(String reason, boolean isTriggeredInternal) {
this.condition.await();
}
}
LOG.info("[{}] starts to releaseSelf because of {}", this, reason);
LOG.debug("[{}] starts to releaseSelf", this);
this.isClosed = true;
// gc self from father
Optional.ofNullable(this.parent).ifPresent(parent -> parent.gcChildPool(this, false,
@@ -137,7 +137,7 @@ public void releaseSelf(String reason, boolean isTriggeredInternal) {
gcChildPool(child, true, isTriggeredInternal);
}
copiedChildren.clear();
LOG.info("[{}] finishes to releaseSelf", this);
LOG.info("[{}] finishes to releaseSelf because of {}", this, reason);
} catch (InterruptedException e) {
LOG.error("Failed to release self because ", e);
Thread.currentThread().interrupt();
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ public long tryToReclaimLocalMemory(long neededBytes, MemoryPool requestingPool)
if (!this.equals(requestingPool)) {
this.memoryActionLock.lock();
}
LOG.info("[{}] tryToReclaimLocalMemory: neededBytes={}", this, neededBytes);
LOG.debug("[{}] tryToReclaimLocalMemory: neededBytes={}", this, neededBytes);
this.isBeingArbitrated.set(true);
// 1. try to reclaim self free memory
reclaimableBytes = getFreeBytes();
@@ -125,8 +125,12 @@ public Object requireMemory(long bytes, MemoryPool requestingPool) {
// use lock to ensure the atomicity of the two-step operation
this.memoryActionLock.lock();
// if free memory is enough, use free memory directly.
if (getFreeBytes() >= bytes) {
this.stats.setAllocatedBytes(this.stats.getAllocatedBytes() - bytes);
if (getFreeBytes() < bytes) {
LOG.debug("[{}] require {} bytes, there is enough free memory {} bytes, will " +
"require memory directly from self's free memory.",
this,
getFreeBytes(),
bytes);
} else {
// if free memory is not enough, try to request delta
long delta = bytes - getFreeBytes();
@@ -154,7 +158,7 @@ public Object tryToAcquireMemoryInternal(long size) {
LOG.warn("[{}] is already closed, will abort this allocate", this);
return null;
}
LOG.info("[{}] tryToAcquireMemory: size={}", this, size);
LOG.debug("[{}] tryToAcquireMemory: size={}", this, size);
// 1. update statistic
super.tryToAcquireMemoryInternal(size);
// 2. call parent to update statistic
@@ -178,7 +182,7 @@ public long requestMemoryInternal(long size, MemoryPool requestingPool) throws
if (this.isBeingArbitrated.get()) {
this.condition.await();
}
LOG.info("[{}] requestMemory: request size={}", this, size);
LOG.debug("[{}] requestMemory: request size={}", this, size);
// 1. align size
long alignedSize = RoundUtil.sizeAlign(size);
// 2. reserve(round)
@@ -197,7 +201,7 @@ public long requestMemoryInternal(long size, MemoryPool requestingPool) throws
// 4. update stats
this.stats.setAllocatedBytes(this.stats.getAllocatedBytes() + neededMemorySize);
this.stats.setNumExpands(this.stats.getNumExpands() + 1);
LOG.info("[{}] requestMemory success: requestedMemorySize={}", this, fatherRes);
LOG.debug("[{}] requestMemory success: requestedMemorySize={}", this, fatherRes);
return fatherRes;
} catch (InterruptedException e) {
LOG.error("Failed to release self because ", e);
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ public long requestMemoryInternal(long bytes, MemoryPool requestingPool) {
}

private long tryToExpandSelfCapacity(long size) {
LOG.info("[{}] try to expand its capacity: size={}", this, size);
LOG.debug("[{}] try to expand its capacity: size={}", this, size);
long alignedSize = RoundUtil.sizeAlign(size);
long realNeededSize =
RoundUtil.roundDelta(getAllocatedBytes(), alignedSize);
Original file line number Diff line number Diff line change
@@ -18,24 +18,35 @@
package org.apache.hugegraph.core.memory;

import java.nio.file.Paths;
import java.util.UUID;

import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.configuration2.io.FileHandler;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.StandardHugeGraph;
import org.apache.hugegraph.backend.cache.CachedBackendStore;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.dist.RegisterUtil;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.consumer.factory.IdFactory;
import org.apache.hugegraph.memory.consumer.factory.PropertyFactory;
import org.apache.hugegraph.memory.consumer.impl.id.StringIdOffHeap;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.SchemaManager;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeEdgeProperty;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -95,9 +106,9 @@ public static void clear() throws Exception {
}

@Test
public void testId() {
public void testStringId() {
Id stringIdOffHeap = IdFactory.getInstance().newStringId("java");
Id stringId = new IdGenerator.StringId("java");
Id stringId = IdGenerator.of("java");
Assert.assertNotNull(stringIdOffHeap);
Assert.assertEquals("java", stringIdOffHeap.asString());
Assert.assertEquals(stringId, ((OffHeapObject) stringIdOffHeap).zeroCopyReadFromByteBuf());
@@ -109,11 +120,65 @@ public void testId() {
}

@Test
public void testComplexId() {
public void testLongId() {
Id idOffHeap = IdFactory.getInstance().newLongId(1);
Id id = IdGenerator.of(1);
Assert.assertNotNull(idOffHeap);
Assert.assertEquals(1, idOffHeap.asLong());
Assert.assertEquals(id, ((OffHeapObject) idOffHeap).zeroCopyReadFromByteBuf());
}

@Test
public void testUuidId() {
UUID uuidEncoding = UUID.randomUUID();
Id idOffHeap = IdFactory.getInstance().newUuidId(uuidEncoding);
Id id = IdGenerator.of(uuidEncoding);
Assert.assertNotNull(idOffHeap);
Assert.assertArrayEquals(id.asBytes(), idOffHeap.asBytes());
Assert.assertEquals(id, ((OffHeapObject) idOffHeap).zeroCopyReadFromByteBuf());
}

@Test
public void testBinaryId() {
Id stringIdOffHeap = IdFactory.getInstance().newStringId("java");
Id idOffHeap =
IdFactory.getInstance().newBinaryId(stringIdOffHeap.asBytes(), stringIdOffHeap);
Id id = new BinaryBackendEntry.BinaryId(stringIdOffHeap.asBytes(), stringIdOffHeap);
Assert.assertNotNull(idOffHeap);
Assert.assertArrayEquals(stringIdOffHeap.asBytes(), idOffHeap.asBytes());
Assert.assertEquals(id, ((OffHeapObject) idOffHeap).zeroCopyReadFromByteBuf());
}

@Test
public void testObjectId() {
TestObject object = new TestObject();
object.x = 1;
object.y = "test";
Id idOffHeap =
IdFactory.getInstance().newObjectId(object);
Id id = new IdGenerator.ObjectId(object);
Assert.assertNotNull(idOffHeap);
Assert.assertEquals(id.asObject(), idOffHeap.asObject());
Assert.assertEquals(id, ((OffHeapObject) idOffHeap).zeroCopyReadFromByteBuf());
}

@Test
public void testQueryId() {
Query q = new Query(HugeType.VERTEX);
Id idOffHeap =
IdFactory.getInstance().newQueryId(q);
Id id = new CachedBackendStore.QueryId(q);
Assert.assertNotNull(idOffHeap);
Assert.assertEquals(id.toString(), idOffHeap.toString());
Assert.assertEquals(id, ((OffHeapObject) idOffHeap).zeroCopyReadFromByteBuf());
}

@Test
public void testEdgeId() {
Id stringIdOffHeap = IdFactory.getInstance().newStringId("java");
HugeVertex java = new HugeVertex(graph, stringIdOffHeap, graph.vertexLabel("book"));
Id edgeLabelIdOffHeap = IdFactory.getInstance().newStringId("testEdgeLabel");
Id subLabelIdOffHeap = IdFactory.getInstance().newStringId("testSubLabel");
Id edgeLabelIdOffHeap = IdFactory.getInstance().newLongId(1);
Id subLabelIdOffHeap = IdFactory.getInstance().newLongId(2);
Id edgeIdOffHeap =
IdFactory.getInstance().newEdgeId(java, Directions.OUT, edgeLabelIdOffHeap,
subLabelIdOffHeap,
@@ -125,11 +190,52 @@ public void testComplexId() {
"test",
java);
Assert.assertNotNull(edgeIdOffHeap);
Assert.assertEquals(edgeId, edgeIdOffHeap);
// TODO: adopt equals method
Assert.assertEquals(edgeId.asString(), edgeIdOffHeap.asString());
}

@Test
public void testProperty() {
Id stringIdOffHeap = IdFactory.getInstance().newStringId("java");
HugeVertex java = new HugeVertex(graph, stringIdOffHeap, graph.vertexLabel("book"));
Id edgeLabelIdOffHeap = IdFactory.getInstance().newLongId(1);
Id subLabelIdOffHeap = IdFactory.getInstance().newLongId(2);

Id edgeId = new EdgeId(java,
Directions.OUT,
(Id) ((OffHeapObject) edgeLabelIdOffHeap).zeroCopyReadFromByteBuf(),
(Id) ((OffHeapObject) subLabelIdOffHeap).zeroCopyReadFromByteBuf(),
"test",
java);
HugeEdge testEdge = new HugeEdge(graph, edgeId, EdgeLabel.NONE);
PropertyKey propertyKey = new PropertyKey(null, IdFactory.getInstance().newLongId(3),
"fake");

String propertyValue = "test";
HugeProperty<String> propertyOffHeap =
PropertyFactory.getInstance(String.class).newHugeEdgeProperty(testEdge,
propertyKey,
propertyValue);
HugeEdgeProperty<String> property = new HugeEdgeProperty<>(testEdge,
propertyKey,
propertyValue);
Assert.assertNotNull(propertyOffHeap);
Assert.assertEquals(property.value(), propertyOffHeap.value());
Assert.assertEquals(property, propertyOffHeap);
}

static class TestObject {

long x;
String y;

@Override
public boolean equals(Object obj) {
if (!(obj instanceof TestObject)) {
return false;
}
TestObject other = (TestObject) obj;
return this.x == other.x && this.y.equals(other.y);
}
}
}