Skip to content

Commit

Permalink
Delete write mem control parameters (#12007)
Browse files Browse the repository at this point in the history
  • Loading branch information
l2280212 authored Feb 1, 2024
1 parent 035be50 commit 9337d45
Show file tree
Hide file tree
Showing 32 changed files with 247 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,6 @@ public class IoTDBConfig {
*/
private int maxPendingWindowEvaluationTasks = 64;

/** Is the write mem control for writing enable. */
private boolean enableMemControl = true;

/** Is the write ahead log enable. */
private boolean enableIndex = false;

Expand All @@ -383,9 +380,6 @@ public class IoTDBConfig {
/** When a sequence TsFile's file size (in byte) exceed this, the TsFile is forced closed. */
private long seqTsFileSize = 0L;

/** When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. Unit: byte */
private long memtableSizeThreshold = 1024 * 1024 * 1024L;

/** Whether to timed flush sequence tsfiles' memtables. */
private boolean enableTimedFlushSeqMemtable = true;

Expand Down Expand Up @@ -460,13 +454,6 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;

/**
* Enable compaction memory control or not. If true and estimated memory size of one compaction
* task exceeds the threshold, system will block the compaction. It only works for cross space
* compaction currently.
*/
private boolean enableCompactionMemControl = true;

private double chunkMetadataSizeProportion = 0.1;

/** The target tsfile size in compaction, 2 GB by default */
Expand Down Expand Up @@ -758,9 +745,6 @@ public class IoTDBConfig {
/** kerberos principal */
private String kerberosPrincipal = "your principal";

/** the num of memtable in each database */
private int concurrentWritingTimePartition = 1;

/** the default fill interval in LinearFill and PreviousFill, -1 means infinite past time */
private int defaultFillInterval = -1;

Expand Down Expand Up @@ -1207,14 +1191,6 @@ public void setUdfInitialByteArrayLengthForMemoryControl(
this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl;
}

public int getConcurrentWritingTimePartition() {
return concurrentWritingTimePartition;
}

public void setConcurrentWritingTimePartition(int concurrentWritingTimePartition) {
this.concurrentWritingTimePartition = concurrentWritingTimePartition;
}

public int getDefaultFillInterval() {
return defaultFillInterval;
}
Expand Down Expand Up @@ -2028,22 +2004,6 @@ public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMb
this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
}

public boolean isEnableMemControl() {
return enableMemControl;
}

public void setEnableMemControl(boolean enableMemControl) {
this.enableMemControl = enableMemControl;
}

public long getMemtableSizeThreshold() {
return memtableSizeThreshold;
}

public void setMemtableSizeThreshold(long memtableSizeThreshold) {
this.memtableSizeThreshold = memtableSizeThreshold;
}

public boolean isEnableTimedFlushSeqMemtable() {
return enableTimedFlushSeqMemtable;
}
Expand Down Expand Up @@ -2752,14 +2712,6 @@ public void setCompactionPriority(CompactionPriority compactionPriority) {
this.compactionPriority = compactionPriority;
}

public boolean isEnableCompactionMemControl() {
return enableCompactionMemControl;
}

public void setEnableCompactionMemControl(boolean enableCompactionMemControl) {
this.enableCompactionMemControl = enableCompactionMemControl;
}

public long getTargetCompactionFileSize() {
return targetCompactionFileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,22 +365,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
Integer.parseInt(
properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));

conf.setEnableMemControl(
(Boolean.parseBoolean(
properties.getProperty(
"enable_mem_control", Boolean.toString(conf.isEnableMemControl())))));
LOGGER.info("IoTDB enable memory control: {}", conf.isEnableMemControl());

long memTableSizeThreshold =
Long.parseLong(
properties
.getProperty(
"memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
.trim());
if (memTableSizeThreshold > 0) {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}

conf.setTvListSortAlgorithm(
TVListSortAlgorithm.valueOf(
properties.getProperty(
Expand Down Expand Up @@ -475,12 +459,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"compaction_priority", conf.getCompactionPriority().toString())));

conf.setEnableCompactionMemControl(
Boolean.parseBoolean(
properties.getProperty(
"enable_compaction_mem_control",
Boolean.toString(conf.isEnableCompactionMemControl()))));

int subtaskNum =
Integer.parseInt(
properties.getProperty(
Expand Down Expand Up @@ -748,13 +726,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"device_path_cache_size", String.valueOf(conf.getDevicePathCacheSize()))));

// the num of memtables in each database
conf.setConcurrentWritingTimePartition(
Integer.parseInt(
properties.getProperty(
"concurrent_writing_time_partition",
String.valueOf(conf.getConcurrentWritingTimePartition()))));

// the default fill interval in LinearFill and PreviousFill
conf.setDefaultFillInterval(
Integer.parseInt(
Expand Down Expand Up @@ -1556,17 +1527,6 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
// update timed flush & close conf
loadTimedService(properties);
StorageEngine.getInstance().rebootTimedService();

long memTableSizeThreshold =
Long.parseLong(
properties
.getProperty(
"memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
.trim());
if (memTableSizeThreshold > 0) {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}

// update params of creating schemaengine automatically
loadAutoCreateSchemaProps(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public class DataRegion implements IDataRegionForQuery {

private static final Logger logger = LoggerFactory.getLogger(DataRegion.class);

private final boolean enableMemControl = config.isEnableMemControl();
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
Expand Down Expand Up @@ -773,21 +772,19 @@ private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recove
tsFileResource.removeResourceFile();
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
// get chunkMetadata size
long chunkMetadataSize = 0;
for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
for (List<ChunkMetadata> metadatas : metaMap.values()) {
for (ChunkMetadata chunkMetadata : metadatas) {
chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes();
}
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
// get chunkMetadata size
long chunkMetadataSize = 0;
for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
for (List<ChunkMetadata> metadatas : metaMap.values()) {
for (ChunkMetadata chunkMetadata : metadatas) {
chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes();
}
}
tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
}
tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
}
tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
} catch (Throwable e) {
Expand Down Expand Up @@ -872,9 +869,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
throw new OutOfTTLException(
insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - dataTTL));
}
if (enableMemControl) {
StorageEngine.blockInsertionIfReject(null);
}
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRow");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
Expand Down Expand Up @@ -928,9 +923,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
if (enableMemControl) {
StorageEngine.blockInsertionIfReject(null);
}
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("insertTablet");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
Expand Down Expand Up @@ -1421,11 +1414,9 @@ private TsFileProcessor getTsFileProcessor(
this::flushCallback,
sequence);

if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
}
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);

tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
tsFileProcessor.addFlushListeners(customFlushListeners);
Expand Down Expand Up @@ -3055,9 +3046,7 @@ public TsFileManager getTsFileResourceManager() {
*/
public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
throws WriteProcessException, BatchProcessException {
if (enableMemControl) {
StorageEngine.blockInsertionIfReject(null);
}
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRowsOfOneDevice");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
Expand Down Expand Up @@ -3131,9 +3120,7 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)

public void insert(InsertRowsNode insertRowsNode)
throws BatchProcessException, WriteProcessRejectException {
if (enableMemControl) {
StorageEngine.blockInsertionIfReject(null);
}
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
writeLock("InsertRows");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void perform()
CompactionTsFileWriter writer =
new CompactionTsFileWriter(
targetResource.getTsFile(),
true,
sizeForFileWriter,
CompactionType.INNER_SEQ_COMPACTION)) {
while (deviceIterator.hasNextDevice()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
Expand Down Expand Up @@ -132,12 +131,10 @@ public InnerSpaceCompactionTask(
this.selectedTsFileResourceList = selectedTsFileResourceList;
this.sequence = sequence;
this.performer = performer;
if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
if (this.performer instanceof ReadChunkCompactionPerformer) {
innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
} else if (!sequence && this.performer instanceof FastCompactionPerformer) {
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
if (this.performer instanceof ReadChunkCompactionPerformer) {
innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
} else if (!sequence && this.performer instanceof FastCompactionPerformer) {
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,10 @@ private boolean canRecover() {

private boolean shouldRollback() {
// if target file or its responding file does not exist, then return true
if (targetFile == null
return targetFile == null
|| !targetFile.tsFileExists()
|| !targetFile.resourceFileExists()
|| (unseqFileToInsert.modFileExists() && !targetFile.modFileExists())) {
return true;
}
return false;
|| (unseqFileToInsert.modFileExists() && !targetFile.modFileExists());
}

private void rollback() throws IOException {
Expand All @@ -303,7 +300,7 @@ private void rollback() throws IOException {
FileMetrics.getInstance().deleteTsFile(true, Collections.singletonList(targetFile));
}
// delete target file
if (targetFile != null && !deleteTsFileOnDisk(targetFile)) {
if (!deleteTsFileOnDisk(targetFile)) {
throw new CompactionRecoverException(
String.format("failed to delete target file %s", targetFile));
}
Expand All @@ -322,9 +319,6 @@ private void finishTask() throws IOException {

@Override
public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
if (!(otherTask instanceof InsertionCrossSpaceCompactionTask)) {
return false;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ protected AbstractCrossCompactionWriter(
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()
/ targetResources.size());
boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
for (int i = 0; i < targetResources.size(); i++) {
this.targetFileWriters.add(
new CompactionTsFileWriter(
targetResources.get(i).getTsFile(),
enableMemoryControl,
memorySizeForEachWriter,
CompactionType.CROSS_COMPACTION));
isEmptyFile[i] = true;
Expand All @@ -94,8 +92,8 @@ public void startChunkGroup(String deviceId, boolean isAlign) throws IOException
this.isAlign = isAlign;
this.seqFileIndexArray = new int[subTaskNum];
checkIsDeviceExistAndGetDeviceEndTime();
for (int i = 0; i < targetFileWriters.size(); i++) {
chunkGroupHeaderSize = targetFileWriters.get(i).startChunkGroup(deviceId);
for (CompactionTsFileWriter targetFileWriter : targetFileWriters) {
chunkGroupHeaderSize = targetFileWriter.startChunkGroup(deviceId);
}
}

Expand Down Expand Up @@ -166,8 +164,7 @@ public void close() throws IOException {

@Override
public void checkAndMayFlushChunkMetadata() throws IOException {
for (int i = 0; i < targetFileWriters.size(); i++) {
CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i);
for (CompactionTsFileWriter fileIoWriter : targetFileWriters) {
fileIoWriter.checkMetadataSizeAndMayFlush();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;

Expand All @@ -38,23 +37,16 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr

protected TsFileResource targetResource;

protected long targetPageSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();

protected long targetPagePointNum =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();

protected AbstractInnerCompactionWriter(TsFileResource targetFileResource) throws IOException {
long sizeForFileWriter =
(long)
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
this.targetResource = targetFileResource;
this.fileWriter =
new CompactionTsFileWriter(
targetFileResource.getTsFile(),
enableMemoryControl,
sizeForFileWriter,
targetResource.isSeq()
? CompactionType.INNER_SEQ_COMPACTION
Expand Down
Loading

0 comments on commit 9337d45

Please sign in to comment.