Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage Engine: put DeviceCache into the total memory of the storage engine #12016

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ public class IoTDBConfig {
private double rejectProportion = 0.8;

/** The proportion of write memory for memtable */
private double writeProportionForMemtable = 0.76;
private double writeProportionForMemtable = 0.72;

/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;

/** The proportion of write memory for device path cache */
private double devicePathCacheProportion = 0.04;

/**
* If memory cost of data region increased more than proportion of {@linkplain
* IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
Expand Down Expand Up @@ -930,8 +933,6 @@ public class IoTDBConfig {
*/
private int partitionCacheSize = 1000;

private int devicePathCacheSize = 500_000;

/** Cache size of user and role */
private int authorCacheSize = 100;

Expand Down Expand Up @@ -3038,14 +3039,6 @@ public void setPartitionCacheSize(int partitionCacheSize) {
this.partitionCacheSize = partitionCacheSize;
}

public int getDevicePathCacheSize() {
return devicePathCacheSize;
}

public void setDevicePathCacheSize(int devicePathCacheSize) {
this.devicePathCacheSize = devicePathCacheSize;
}

public int getAuthorCacheSize() {
return authorCacheSize;
}
Expand Down Expand Up @@ -3206,6 +3199,14 @@ public double getCompactionProportion() {
return compactionProportion;
}

public double getDevicePathCacheProportion() {
return devicePathCacheProportion;
}

public void setDevicePathCacheProportion(double devicePathCacheProportion) {
this.devicePathCacheProportion = devicePathCacheProportion;
}

public static String getEnvironmentVariables() {
return "\n\t"
+ IoTDBConstant.IOTDB_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
conf.setKerberosPrincipal(
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));

// the size of device path cache
conf.setDevicePathCacheSize(
Integer.parseInt(
properties.getProperty(
"device_path_cache_size", String.valueOf(conf.getDevicePathCacheSize()))));

// the default fill interval in LinearFill and PreviousFill
conf.setDefaultFillInterval(
Integer.parseInt(
Expand Down Expand Up @@ -1751,57 +1745,61 @@ private void initMemoryAllocate(Properties properties) {

private void initStorageEngineAllocate(Properties properties) {
long storageMemoryTotal = conf.getAllocateMemoryForStorageEngine();

int proportionSum = 10;
int writeProportion = 8;
int compactionProportion = 2;
int writeProportionSum = 20;
int memTableProportion = 19;
int timePartitionInfo = 1;

String storageMemoryAllocatePortion =
String valueOfStorageEngineMemoryProportion =
properties.getProperty("storage_engine_memory_proportion");
if (storageMemoryAllocatePortion != null) {
String[] proportions = storageMemoryAllocatePortion.split(":");
int loadedProportionSum = 0;
for (String proportion : proportions) {
loadedProportionSum += Integer.parseInt(proportion.trim());
}

if (loadedProportionSum != 0) {
proportionSum = loadedProportionSum;
writeProportion = Integer.parseInt(proportions[0].trim());
compactionProportion = Integer.parseInt(proportions[1].trim());
}
conf.setCompactionProportion((double) compactionProportion / (double) proportionSum);
}

String allocationRatioForWrite = properties.getProperty("write_memory_proportion");
if (allocationRatioForWrite != null) {
String[] proportions = allocationRatioForWrite.split(":");
int loadedProportionSum = 0;
for (String proportion : proportions) {
loadedProportionSum += Integer.parseInt(proportion.trim());
if (valueOfStorageEngineMemoryProportion != null) {
String[] storageProportionArray = valueOfStorageEngineMemoryProportion.split(":");
int storageEngineMemoryProportion = 0;
for (String proportion : storageProportionArray) {
int proportionValue = Integer.parseInt(proportion.trim());
if (proportionValue <= 0) {
LOGGER.warn(
"The value of storage_engine_memory_proportion is illegal, use default value 8:2 .");
return;
}
storageEngineMemoryProportion += proportionValue;
}
conf.setCompactionProportion(
(double) Integer.parseInt(storageProportionArray[1].trim())
/ (double) storageEngineMemoryProportion);

String valueOfWriteMemoryProportion = properties.getProperty("write_memory_proportion");
if (valueOfWriteMemoryProportion != null) {
String[] writeProportionArray = valueOfWriteMemoryProportion.split(":");
int writeMemoryProportion = 0;
for (String proportion : writeProportionArray) {
int proportionValue = Integer.parseInt(proportion.trim());
writeMemoryProportion += proportionValue;
if (proportionValue <= 0) {
LOGGER.warn(
"The value of write_memory_proportion is illegal, use default value 18:1:1 .");
return;
}
}

if (loadedProportionSum != 0) {
writeProportionSum = loadedProportionSum;
memTableProportion = Integer.parseInt(proportions[0].trim());
timePartitionInfo = Integer.parseInt(proportions[1].trim());
double writeAllProportionOfStorageEngineMemory =
(double) Integer.parseInt(storageProportionArray[0].trim())
/ storageEngineMemoryProportion;
double memTableProportion =
(double) Integer.parseInt(writeProportionArray[0].trim()) / writeMemoryProportion;
double timePartitionInfoProportion =
(double) Integer.parseInt(writeProportionArray[1].trim()) / writeMemoryProportion;
double devicePathCacheProportion =
(double) Integer.parseInt(writeProportionArray[2].trim()) / writeMemoryProportion;
// writeProportionForMemtable = 8/10 * 18/20 = 0.72 default
conf.setWriteProportionForMemtable(
writeAllProportionOfStorageEngineMemory * memTableProportion);

// allocateMemoryForTimePartitionInfo = storageMemoryTotal * 8/10 * 1/20 default
conf.setAllocateMemoryForTimePartitionInfo(
(long)
((writeAllProportionOfStorageEngineMemory * timePartitionInfoProportion)
* storageMemoryTotal));

// device path cache default memory is value is 8/10 * 1/20 = 0.04 for StorageEngine
conf.setDevicePathCacheProportion(
writeAllProportionOfStorageEngineMemory * devicePathCacheProportion);
}
// memtableProportionForWrite = 19/20 default
double memtableProportionForWrite =
((double) memTableProportion / (double) writeProportionSum);

// timePartitionInfoForWrite = 1/20 default
double timePartitionInfoForWrite = ((double) timePartitionInfo / (double) writeProportionSum);
// proportionForWrite = 8/10 default
double proportionForWrite = ((double) (writeProportion) / (double) proportionSum);
// writeProportionForMemtable = 8/10 * 19/20 = 0.76 default
conf.setWriteProportionForMemtable(proportionForWrite * memtableProportionForWrite);
// allocateMemoryForTimePartitionInfo = storageMemoryTotal * 8/10 * 1/20 default
conf.setAllocateMemoryForTimePartitionInfo(
(long) ((proportionForWrite * timePartitionInfoForWrite) * storageMemoryTotal));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;

/** This cache is for reducing duplicated DeviceId PartialPath initialization in write process. */
public class DataNodeDevicePathCache {
Expand All @@ -35,7 +36,15 @@ public class DataNodeDevicePathCache {
private final Cache<String, PartialPath> devicePathCache;

private DataNodeDevicePathCache() {
devicePathCache = Caffeine.newBuilder().maximumSize(config.getDevicePathCacheSize()).build();
devicePathCache =
Caffeine.newBuilder()
.maximumWeight(
(long)
(config.getAllocateMemoryForStorageEngine()
* config.getDevicePathCacheProportion()))
.weigher(
(Weigher<String, PartialPath>) (key, val) -> (PartialPath.estimateSize(val) + 32))
.build();
}

public static DataNodeDevicePathCache getInstance() {
Expand Down Expand Up @@ -63,6 +72,14 @@ public PartialPath getPartialPath(String deviceId) throws IllegalPathException {
}
}

public String getDeviceId(String deviceId) {
try {
return getPartialPath(deviceId).getFullPath();
} catch (IllegalPathException e) {
return deviceId;
}
}

public void cleanUp() {
devicePathCache.cleanUp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
Expand Down Expand Up @@ -159,7 +160,9 @@ public static DeleteDataNode deserializeFromWAL(DataInputStream stream) throws I
List<PartialPath> pathList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
try {
pathList.add(new PartialPath(ReadWriteIOUtils.readString(stream)));
pathList.add(
DataNodeDevicePathCache.getInstance()
.getPartialPath(ReadWriteIOUtils.readString(stream)));
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
Expand Down Expand Up @@ -622,7 +623,9 @@ public static InsertRowNode deserializeFromWAL(DataInputStream stream) throws IO
insertNode.setSearchIndex(stream.readLong());
insertNode.setTime(stream.readLong());
try {
insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
insertNode.setDevicePath(
DataNodeDevicePathCache.getInstance()
.getPartialPath(ReadWriteIOUtils.readString(stream)));
} catch (IllegalPathException e) {
throw new IllegalArgumentException(DESERIALIZE_ERROR, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
Expand Down Expand Up @@ -866,7 +867,8 @@ public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws
private void subDeserializeFromWAL(DataInputStream stream) throws IOException {
searchIndex = stream.readLong();
try {
devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
devicePath =
DataNodeDevicePathCache.getInstance().getPartialPath(ReadWriteIOUtils.readString(stream));
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ private void updateLastFlushTime(TsFileResource resource, boolean isSeq) {
Map<String, Long> endTimeMap = new HashMap<>();
for (String deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
endTimeMap.put(deviceId.intern(), endTime);
endTimeMap.put(deviceId, endTime);
}
if (config.isEnableSeparateData()) {
lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, endTimeMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

Expand Down Expand Up @@ -115,7 +116,10 @@ public static Deletion deserializeWithoutFileOffset(DataInputStream stream)
long startTime = stream.readLong();
long endTime = stream.readLong();
return new Deletion(
new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime, endTime);
DataNodeDevicePathCache.getInstance().getPartialPath(ReadWriteIOUtils.readString(stream)),
0,
startTime,
endTime);
}

public long getSerializedSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
Expand Down Expand Up @@ -117,7 +116,9 @@ public DeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
}

for (int i = 0; i < deviceNum; i++) {
String path = ReadWriteIOUtils.readString(inputStream).intern();
String path =
DataNodeDevicePathCache.getInstance()
.getDeviceId(ReadWriteIOUtils.readString(inputStream));
int index = ReadWriteIOUtils.readInt(inputStream);
deviceToIndex.put(path, index);
}
Expand All @@ -138,7 +139,8 @@ public DeviceTimeIndex deserialize(ByteBuffer buffer) {
}

for (int i = 0; i < deviceNum; i++) {
String path = SerializeUtils.deserializeString(buffer).intern();
String path =
DataNodeDevicePathCache.getInstance().getDeviceId(ReadWriteIOUtils.readString(buffer));
int index = buffer.getInt();
deviceToIndex.put(path, index);
}
Expand Down Expand Up @@ -171,7 +173,9 @@ public static Set<String> getDevices(InputStream inputStream) throws IOException
ReadWriteIOUtils.skip(inputStream, 2L * deviceNum * ReadWriteIOUtils.LONG_LEN);
Set<String> devices = new HashSet<>();
for (int i = 0; i < deviceNum; i++) {
String path = ReadWriteIOUtils.readString(inputStream).intern();
String path =
DataNodeDevicePathCache.getInstance()
.getDeviceId(ReadWriteIOUtils.readString(inputStream));
ReadWriteIOUtils.skip(inputStream, ReadWriteIOUtils.INT_LEN);
devices.add(path);
}
Expand Down Expand Up @@ -215,7 +219,7 @@ private int getDeviceIndex(String deviceId) {
index = deviceToIndex.get(deviceId);
} else {
index = deviceToIndex.size();
deviceToIndex.put(deviceId.intern(), index);
deviceToIndex.put(DataNodeDevicePathCache.getInstance().getDeviceId(deviceId), index);
if (startTimes.length <= index) {
startTimes = enLargeArray(startTimes, Long.MAX_VALUE);
endTimes = enLargeArray(endTimes, Long.MIN_VALUE);
Expand Down
Loading
Loading