Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support MemoryManagement for graph query framework #2649

Merged
merged 51 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7a652be
feat: framework for memoryManagement
Pengzna Aug 25, 2024
ccc918b
wip: memory pool and manager framework
Pengzna Aug 25, 2024
b266358
wip: memory allocation
Pengzna Oct 8, 2024
ff10c3a
wip: memory reclaim
Pengzna Oct 9, 2024
26b0cfe
rename class
Pengzna Oct 9, 2024
f6aeace
fix review
Pengzna Oct 9, 2024
72e5bf3
remove useless allocator
Pengzna Oct 9, 2024
48f4817
netty allocator
Pengzna Oct 10, 2024
d906d04
revert config.properties
Pengzna Oct 13, 2024
b308be0
fix and improvement for allocation and deallocation
Pengzna Oct 22, 2024
09367a1
Merge remote-tracking branch 'refs/remotes/base/master' into memory/m…
Pengzna Oct 22, 2024
ea9a459
move monitor
Pengzna Oct 22, 2024
f552fd2
Revert "move monitor"
Pengzna Oct 22, 2024
8a2c65c
improve memory arbitration
Pengzna Oct 22, 2024
c37f869
suspend query when arbitration & kill query when OOM
Pengzna Oct 23, 2024
5904909
fix review
Pengzna Oct 23, 2024
0e70e44
fury test
Pengzna Oct 23, 2024
5d71541
offHeap magic
Pengzna Oct 23, 2024
f73f0ab
Revert "fury test"
Pengzna Oct 23, 2024
871015e
offHeap magic util
Pengzna Oct 23, 2024
8344443
complete adoption for all id
Pengzna Oct 25, 2024
d9cf408
complete property adoption
Pengzna Oct 26, 2024
aaeacb5
release ByteBuf off heap memory block
Pengzna Oct 26, 2024
ef0d629
complete allocate memory test and fix bug
Pengzna Oct 27, 2024
54d1fd8
fix some bugs: arbitration & suspend
Pengzna Oct 27, 2024
bfe75c0
complete OOM UT and fix bugs
Pengzna Oct 27, 2024
ab1bcde
complete memory management framework UT and fix all bugs
Pengzna Oct 27, 2024
91df57a
fix ut
Pengzna Oct 27, 2024
52ca7af
keep format consistent with original version
Pengzna Oct 28, 2024
ee8e125
fix review
Pengzna Oct 28, 2024
1a7d461
Merge branch 'master' into memory/management
imbajin Oct 28, 2024
de9d7a1
wip: adoption to query chain & introduce factory
Pengzna Oct 28, 2024
46066eb
fix concurrent bug when local arbitrate
Pengzna Oct 29, 2024
4f1e966
add comments
Pengzna Oct 29, 2024
7be5069
Merge remote-tracking branch 'origin/memory/management' into memory/m…
Pengzna Oct 29, 2024
231b647
feat: off-heap object factory
Pengzna Oct 29, 2024
865f1fb
fix gc child bugs and add consumer test
Pengzna Oct 29, 2024
f34e233
fix deallocate netty memory block bug & add complexId test
Pengzna Oct 29, 2024
879390b
complete all ut
Pengzna Oct 29, 2024
a96e9ee
fix all bugs
Pengzna Oct 29, 2024
7c86e84
dependency
Pengzna Oct 29, 2024
5af2cb9
add comments
Pengzna Oct 29, 2024
5e47bb0
add private constructor for singleton
Pengzna Oct 29, 2024
b77346b
add memory management config
Pengzna Oct 29, 2024
d00a8df
improve robustness
Pengzna Oct 29, 2024
fecc909
remove duplicate
Pengzna Oct 29, 2024
31f1feb
improve condition usage
Pengzna Oct 29, 2024
d4035bd
improve log
Pengzna Oct 29, 2024
d87388b
fix memory conservation bug
Pengzna Oct 29, 2024
d25396d
Revert "dependency"
Pengzna Nov 4, 2024
b334c61
revert duplicate known-dependencies.txt under huge-common
Pengzna Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hugegraph.memory.arbitrator.MemoryArbitrator;
import org.apache.hugegraph.memory.arbitrator.MemoryArbitratorImpl;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.pool.impl.QueryMemoryPool;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class);
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
private static final String QUERY_MEMORY_POOL_NAME_PREFIX = "QueryMemoryPool";
private static final String ARBITRATE_MEMORY_POOL_NAME = "ArbitrateMemoryPool";
private static final String DELIMINATOR = "_";
private static final int ARBITRATE_MEMORY_THREAD_NUM = 12;
// TODO: read it from conf, current 1G
public static final long MAX_MEMORY_CAPACITY_IN_BYTES = Bytes.GB;
private final AtomicLong currentMemoryCapacityInBytes =
new AtomicLong(MAX_MEMORY_CAPACITY_IN_BYTES);
private final Set<MemoryPool> queryMemoryPools = new CopyOnWriteArraySet<>();
private final MemoryArbitrator memoryArbitrator;
private final ExecutorService arbitrateExecutor;
// TODO: integrated with mingzhen's monitor thread
// private final Runnable queryGCThread;

private MemoryManager() {
this.memoryArbitrator = new MemoryArbitratorImpl();
this.arbitrateExecutor = ExecutorUtil.newFixedThreadPool(ARBITRATE_MEMORY_THREAD_NUM,
ARBITRATE_MEMORY_POOL_NAME);
}

public MemoryPool addQueryMemoryPool() {
int count = queryMemoryPools.size();
String poolName =
QUERY_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
System.currentTimeMillis();
MemoryPool queryPool = new QueryMemoryPool(poolName, this);
queryMemoryPools.add(queryPool);
return queryPool;
}

public void gcQueryMemoryPool(MemoryPool pool) {
queryMemoryPools.remove(pool);
long reclaimedMemory = pool.getAllocatedBytes();
pool.releaseSelf();
currentMemoryCapacityInBytes.addAndGet(reclaimedMemory);
}

public long triggerLocalArbitration(MemoryPool targetPool, long neededBytes) {
Future<Long> future =
arbitrateExecutor.submit(
() -> memoryArbitrator.reclaimLocally(targetPool, neededBytes));
try {
return future.get(MemoryArbitrator.MAX_WAIT_TIME_FOR_LOCAL_RECLAIM,
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.warn("MemoryManager: arbitration locally for {} timed out", targetPool, e);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("MemoryManager: arbitration locally for {} interrupted or failed",
targetPool,
e);
}
return 0;
}

public long triggerGlobalArbitration(MemoryPool requestPool, long neededBytes) {
Future<Long> future =
arbitrateExecutor.submit(
() -> memoryArbitrator.reclaimGlobally(requestPool, neededBytes));
try {
return future.get(MemoryArbitrator.MAX_WAIT_TIME_FOR_GLOBAL_RECLAIM,
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.warn("MemoryManager: arbitration globally for {} timed out", requestPool, e);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("MemoryManager: arbitration globally for {} interrupted or failed",
requestPool, e);
}
return 0;
}

private static class MemoryManagerHolder {

private static final MemoryManager INSTANCE = new MemoryManager();

private MemoryManagerHolder() {
// empty constructor
}
}

public static MemoryManager getInstance() {
return MemoryManagerHolder.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.allocator;

// TODO(pjz): implement different memory allocate strategy.
public interface MemoryAllocator {

Object tryToAllocateOffHeap(long size);

Object forceAllocateOffHeap(long size);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.allocator;

import org.apache.hugegraph.memory.MemoryManager;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

/**
* This class makes fully use of Netty's efficient memory management strategy.
*/
public class NettyMemoryAllocator implements MemoryAllocator {

private final PooledByteBufAllocator offHeapAllocator = PooledByteBufAllocator.DEFAULT;

@Override
public ByteBuf forceAllocateOffHeap(long size) {
return offHeapAllocator.directBuffer((int) size);
}

@Override
public ByteBuf tryToAllocateOffHeap(long size) {
if (offHeapAllocator.metric().usedDirectMemory() + size <
MemoryManager.MAX_MEMORY_CAPACITY_IN_BYTES) {
return offHeapAllocator.directBuffer((int) size);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.arbitrator;

import org.apache.hugegraph.memory.pool.MemoryPool;

public interface MemoryArbitrator {

long MAX_WAIT_TIME_FOR_LOCAL_RECLAIM = 1000;

long MAX_WAIT_TIME_FOR_GLOBAL_RECLAIM = 5000;

long reclaimLocally(MemoryPool queryPool, long neededBytes);

long reclaimGlobally(MemoryPool queryPool, long neededBytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.arbitrator;

import org.apache.hugegraph.memory.pool.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryArbitratorImpl implements MemoryArbitrator {

private static final Logger LOGGER = LoggerFactory.getLogger(MemoryArbitratorImpl.class);

@Override
public long reclaimLocally(MemoryPool queryPool, long neededBytes) {
long startTime = System.currentTimeMillis();
long res = queryPool.tryToReclaimLocalMemory(neededBytes);
LOGGER.info("[{}] reclaim local memory: {} bytes, took {} ms",
Thread.currentThread().getName(),
res,
System.currentTimeMillis() - startTime);
return res;
}

@Override
public long reclaimGlobally(MemoryPool queryPool, long neededBytes) {
long startTime = System.currentTimeMillis();
// TODO
// 1. select the query task that uses the most memory
// 2. suspend that task
// 3. apply disk spill to that task
LOGGER.info("[{}] reclaim global memory: {} bytes, took {} ms",
Thread.currentThread().getName(),
0,
System.currentTimeMillis() - startTime);
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.consumer;

// TODO(pjz): integrated it with HG objects such as edges and vertex.
public interface MemoryConsumer {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved

}
Loading
Loading