diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 26659d87f804..850786efcca0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -359,9 +359,6 @@ public class IoTDBConfig { */ private int maxPendingWindowEvaluationTasks = 64; - /** Is the write mem control for writing enable. */ - private boolean enableMemControl = true; - /** Is the write ahead log enable. */ private boolean enableIndex = false; @@ -383,9 +380,6 @@ public class IoTDBConfig { /** When a sequence TsFile's file size (in byte) exceed this, the TsFile is forced closed. */ private long seqTsFileSize = 0L; - /** When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. Unit: byte */ - private long memtableSizeThreshold = 1024 * 1024 * 1024L; - /** Whether to timed flush sequence tsfiles' memtables. */ private boolean enableTimedFlushSeqMemtable = true; @@ -460,13 +454,6 @@ public class IoTDBConfig { */ private CompactionPriority compactionPriority = CompactionPriority.BALANCE; - /** - * Enable compaction memory control or not. If true and estimated memory size of one compaction - * task exceeds the threshold, system will block the compaction. It only works for cross space - * compaction currently. - */ - private boolean enableCompactionMemControl = true; - private double chunkMetadataSizeProportion = 0.1; /** The target tsfile size in compaction, 2 GB by default */ @@ -758,9 +745,6 @@ public class IoTDBConfig { /** kerberos principal */ private String kerberosPrincipal = "your principal"; - /** the num of memtable in each database */ - private int concurrentWritingTimePartition = 1; - /** the default fill interval in LinearFill and PreviousFill, -1 means infinite past time */ private int defaultFillInterval = -1; @@ -1207,14 +1191,6 @@ public void setUdfInitialByteArrayLengthForMemoryControl( this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl; } - public int getConcurrentWritingTimePartition() { - return concurrentWritingTimePartition; - } - - public void setConcurrentWritingTimePartition(int concurrentWritingTimePartition) { - this.concurrentWritingTimePartition = concurrentWritingTimePartition; - } - public int getDefaultFillInterval() { return defaultFillInterval; } @@ -2028,22 +2004,6 @@ public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMb this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec; } - public boolean isEnableMemControl() { - return enableMemControl; - } - - public void setEnableMemControl(boolean enableMemControl) { - this.enableMemControl = enableMemControl; - } - - public long getMemtableSizeThreshold() { - return memtableSizeThreshold; - } - - public void setMemtableSizeThreshold(long memtableSizeThreshold) { - this.memtableSizeThreshold = memtableSizeThreshold; - } - public boolean isEnableTimedFlushSeqMemtable() { return enableTimedFlushSeqMemtable; } @@ -2752,14 +2712,6 @@ public void setCompactionPriority(CompactionPriority compactionPriority) { this.compactionPriority = compactionPriority; } - public boolean isEnableCompactionMemControl() { - return enableCompactionMemControl; - } - - public void setEnableCompactionMemControl(boolean enableCompactionMemControl) { - this.enableCompactionMemControl = enableCompactionMemControl; - } - public long getTargetCompactionFileSize() { return targetCompactionFileSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 39fb8f51a292..0188501b1cb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -365,22 +365,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO Integer.parseInt( properties.getProperty("batch_size", Integer.toString(conf.getBatchSize())))); - conf.setEnableMemControl( - (Boolean.parseBoolean( - properties.getProperty( - "enable_mem_control", Boolean.toString(conf.isEnableMemControl()))))); - LOGGER.info("IoTDB enable memory control: {}", conf.isEnableMemControl()); - - long memTableSizeThreshold = - Long.parseLong( - properties - .getProperty( - "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold())) - .trim()); - if (memTableSizeThreshold > 0) { - conf.setMemtableSizeThreshold(memTableSizeThreshold); - } - conf.setTvListSortAlgorithm( TVListSortAlgorithm.valueOf( properties.getProperty( @@ -475,12 +459,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO properties.getProperty( "compaction_priority", conf.getCompactionPriority().toString()))); - conf.setEnableCompactionMemControl( - Boolean.parseBoolean( - properties.getProperty( - "enable_compaction_mem_control", - Boolean.toString(conf.isEnableCompactionMemControl())))); - int subtaskNum = Integer.parseInt( properties.getProperty( @@ -748,13 +726,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO properties.getProperty( "device_path_cache_size", String.valueOf(conf.getDevicePathCacheSize())))); - // the num of memtables in each database - conf.setConcurrentWritingTimePartition( - Integer.parseInt( - properties.getProperty( - "concurrent_writing_time_partition", - String.valueOf(conf.getConcurrentWritingTimePartition())))); - // the default fill interval in LinearFill and PreviousFill conf.setDefaultFillInterval( Integer.parseInt( @@ -1556,17 +1527,6 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep // update timed flush & close conf loadTimedService(properties); StorageEngine.getInstance().rebootTimedService(); - - long memTableSizeThreshold = - Long.parseLong( - properties - .getProperty( - "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold())) - .trim()); - if (memTableSizeThreshold > 0) { - conf.setMemtableSizeThreshold(memTableSizeThreshold); - } - // update params of creating schemaengine automatically loadAutoCreateSchemaProps(properties); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index e51fdba5a4c6..632c99ea44a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -194,7 +194,6 @@ public class DataRegion implements IDataRegionForQuery { private static final Logger logger = LoggerFactory.getLogger(DataRegion.class); - private final boolean enableMemControl = config.isEnableMemControl(); /** * a read write lock for guaranteeing concurrent safety when accessing all fields in this class * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor, @@ -773,21 +772,19 @@ private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recove tsFileResource.removeResourceFile(); tsFileProcessor.setTimeRangeId(timePartitionId); writer.makeMetadataVisible(); - if (enableMemControl) { - TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); - tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); - this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); - // get chunkMetadata size - long chunkMetadataSize = 0; - for (Map> metaMap : writer.getMetadatasForQuery().values()) { - for (List metadatas : metaMap.values()) { - for (ChunkMetadata chunkMetadata : metadatas) { - chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes(); - } + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); + tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); + // get chunkMetadata size + long chunkMetadataSize = 0; + for (Map> metaMap : writer.getMetadatasForQuery().values()) { + for (List metadatas : metaMap.values()) { + for (ChunkMetadata chunkMetadata : metadatas) { + chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes(); } } - tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize); } + tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize); } tsFileManager.add(tsFileResource, recoverPerformer.isSequence()); } catch (Throwable e) { @@ -872,9 +869,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException { throw new OutOfTTLException( insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - dataTTL)); } - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(null); - } + StorageEngine.blockInsertionIfReject(null); long startTime = System.nanoTime(); writeLock("InsertRow"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -928,9 +923,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException { @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning public void insertTablet(InsertTabletNode insertTabletNode) throws BatchProcessException, WriteProcessException { - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(null); - } + StorageEngine.blockInsertionIfReject(null); long startTime = System.nanoTime(); writeLock("insertTablet"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -1421,11 +1414,9 @@ private TsFileProcessor getTsFileProcessor( this::flushCallback, sequence); - if (enableMemControl) { - TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); - tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); - this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); - } + TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); + tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); + this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); tsFileProcessor.addCloseFileListeners(customCloseFileListeners); tsFileProcessor.addFlushListeners(customFlushListeners); @@ -3055,9 +3046,7 @@ public TsFileManager getTsFileResourceManager() { */ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) throws WriteProcessException, BatchProcessException { - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(null); - } + StorageEngine.blockInsertionIfReject(null); long startTime = System.nanoTime(); writeLock("InsertRowsOfOneDevice"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -3131,9 +3120,7 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) public void insert(InsertRowsNode insertRowsNode) throws BatchProcessException, WriteProcessRejectException { - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(null); - } + StorageEngine.blockInsertionIfReject(null); long startTime = System.nanoTime(); writeLock("InsertRows"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index daed9052bbf4..cb3ebb47885a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -75,7 +75,6 @@ public void perform() CompactionTsFileWriter writer = new CompactionTsFileWriter( targetResource.getTsFile(), - true, sizeForFileWriter, CompactionType.INNER_SEQ_COMPACTION)) { while (deviceIterator.hasNextDevice()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 5a5538bd6bcc..5219ab6bbf19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; @@ -132,12 +131,10 @@ public InnerSpaceCompactionTask( this.selectedTsFileResourceList = selectedTsFileResourceList; this.sequence = sequence; this.performer = performer; - if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) { - if (this.performer instanceof ReadChunkCompactionPerformer) { - innerSpaceEstimator = new ReadChunkInnerCompactionEstimator(); - } else if (!sequence && this.performer instanceof FastCompactionPerformer) { - innerSpaceEstimator = new FastCompactionInnerCompactionEstimator(); - } + if (this.performer instanceof ReadChunkCompactionPerformer) { + innerSpaceEstimator = new ReadChunkInnerCompactionEstimator(); + } else if (!sequence && this.performer instanceof FastCompactionPerformer) { + innerSpaceEstimator = new FastCompactionInnerCompactionEstimator(); } isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()]; for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java index e4c5ac2deacb..d5049899bfcb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java @@ -283,13 +283,10 @@ private boolean canRecover() { private boolean shouldRollback() { // if target file or its responding file does not exist, then return true - if (targetFile == null + return targetFile == null || !targetFile.tsFileExists() || !targetFile.resourceFileExists() - || (unseqFileToInsert.modFileExists() && !targetFile.modFileExists())) { - return true; - } - return false; + || (unseqFileToInsert.modFileExists() && !targetFile.modFileExists()); } private void rollback() throws IOException { @@ -303,7 +300,7 @@ private void rollback() throws IOException { FileMetrics.getInstance().deleteTsFile(true, Collections.singletonList(targetFile)); } // delete target file - if (targetFile != null && !deleteTsFileOnDisk(targetFile)) { + if (!deleteTsFileOnDisk(targetFile)) { throw new CompactionRecoverException( String.format("failed to delete target file %s", targetFile)); } @@ -322,9 +319,6 @@ private void finishTask() throws IOException { @Override public boolean equalsOtherTask(AbstractCompactionTask otherTask) { - if (!(otherTask instanceof InsertionCrossSpaceCompactionTask)) { - return false; - } return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 6bc3a1af4067..11306464044e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -74,12 +74,10 @@ protected AbstractCrossCompactionWriter( / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion() / targetResources.size()); - boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); for (int i = 0; i < targetResources.size(); i++) { this.targetFileWriters.add( new CompactionTsFileWriter( targetResources.get(i).getTsFile(), - enableMemoryControl, memorySizeForEachWriter, CompactionType.CROSS_COMPACTION)); isEmptyFile[i] = true; @@ -94,8 +92,8 @@ public void startChunkGroup(String deviceId, boolean isAlign) throws IOException this.isAlign = isAlign; this.seqFileIndexArray = new int[subTaskNum]; checkIsDeviceExistAndGetDeviceEndTime(); - for (int i = 0; i < targetFileWriters.size(); i++) { - chunkGroupHeaderSize = targetFileWriters.get(i).startChunkGroup(deviceId); + for (CompactionTsFileWriter targetFileWriter : targetFileWriters) { + chunkGroupHeaderSize = targetFileWriter.startChunkGroup(deviceId); } } @@ -166,8 +164,7 @@ public void close() throws IOException { @Override public void checkAndMayFlushChunkMetadata() throws IOException { - for (int i = 0; i < targetFileWriters.size(); i++) { - CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i); + for (CompactionTsFileWriter fileIoWriter : targetFileWriters) { fileIoWriter.checkMetadataSizeAndMayFlush(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index d3d8cf00b1be..de191b394c9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -38,23 +37,16 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr protected TsFileResource targetResource; - protected long targetPageSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); - - protected long targetPagePointNum = - TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); - protected AbstractInnerCompactionWriter(TsFileResource targetFileResource) throws IOException { long sizeForFileWriter = (long) ((double) SystemInfo.getInstance().getMemorySizeForCompaction() / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); - boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); this.targetResource = targetFileResource; this.fileWriter = new CompactionTsFileWriter( targetFileResource.getTsFile(), - enableMemoryControl, sizeForFileWriter, targetResource.isSeq() ? CompactionType.INNER_SEQ_COMPACTION diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index 834814746f86..6783d84bfe57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -43,10 +43,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter { private volatile boolean isWritingAligned = false; private boolean isEmptyTargetFile = true; - public CompactionTsFileWriter( - File file, boolean enableMemoryControl, long maxMetadataSize, CompactionType type) + public CompactionTsFileWriter(File file, long maxMetadataSize, CompactionType type) throws IOException { - super(file, enableMemoryControl, maxMetadataSize); + super(file, maxMetadataSize); this.type = type; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java index 677ba75b6470..4193e6697fa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; @@ -85,19 +84,16 @@ public boolean processOneCompactionTask(AbstractCompactionTask task) { return false; } task.transitSourceFilesToMerging(); - if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) { - estimatedMemoryCost = task.getEstimatedMemoryCost(); - if (estimatedMemoryCost < 0) { - return false; - } - CompactionTaskType taskType = task.getCompactionTaskType(); - memoryAcquired = - SystemInfo.getInstance().addCompactionMemoryCost(taskType, estimatedMemoryCost, 60); - CompactionMetrics.getInstance() - .updateCompactionMemoryMetrics(taskType, estimatedMemoryCost); - CompactionMetrics.getInstance() - .updateCompactionTaskSelectedFileNum(taskType, task.getAllSourceTsFiles().size()); + estimatedMemoryCost = task.getEstimatedMemoryCost(); + if (estimatedMemoryCost < 0) { + return false; } + CompactionTaskType taskType = task.getCompactionTaskType(); + memoryAcquired = + SystemInfo.getInstance().addCompactionMemoryCost(taskType, estimatedMemoryCost, 60); + CompactionMetrics.getInstance().updateCompactionMemoryMetrics(taskType, estimatedMemoryCost); + CompactionMetrics.getInstance() + .updateCompactionTaskSelectedFileNum(taskType, task.getAllSourceTsFiles().size()); fileHandleAcquired = SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60); CompactionTaskSummary summary = task.getSummary(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java index 2c5a6acd2b57..2207fd7f51a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java @@ -33,9 +33,6 @@ public abstract class AbstractCrossSpaceEstimator extends AbstractCompactionEsti public long estimateCrossCompactionMemory( List seqResources, List unseqResources) throws IOException { - if (!config.isEnableCompactionMemControl()) { - return 0; - } List resources = new ArrayList<>(seqResources.size() + unseqResources.size()); resources.addAll(seqResources); resources.addAll(unseqResources); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java index ae2c6f2e016b..50cf624a5c1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java @@ -31,14 +31,10 @@ public abstract class AbstractInnerSpaceEstimator extends AbstractCompactionEstimator { public long estimateInnerCompactionMemory(List resources) throws IOException { - if (!config.isEnableCompactionMemControl()) { - return 0; - } - if (!CompactionEstimateUtils.addReadLock(resources)) { return -1L; } - long cost = 0; + long cost; try { if (!isAllSourceFileExist(resources)) { return -1L; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 68ccba4bcec5..ab5440d53583 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -60,7 +60,7 @@ public class MemTableFlushTask { private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager.getInstance(); private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); - private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); /* storage group name -> last time */ private static final Map flushPointsCache = new ConcurrentHashMap<>(); private final Future encodingTaskFuture; @@ -69,7 +69,7 @@ public class MemTableFlushTask { private final LinkedBlockingQueue encodingTaskQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue ioTaskQueue = - (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) + (SystemInfo.getInstance().isEncodingFasterThanIo()) ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) : new LinkedBlockingQueue<>(); @@ -118,7 +118,7 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException avgSeriesPointsNum); long estimatedTemporaryMemSize = 0L; - if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) { + if (SystemInfo.getInstance().isEncodingFasterThanIo()) { estimatedTemporaryMemSize = memTable.getSeriesNumber() == 0 ? 0 @@ -192,12 +192,10 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException throw new ExecutionException(e); } - if (config.isEnableMemControl()) { - if (estimatedTemporaryMemSize != 0) { - SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize); - } - SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime); + if (estimatedTemporaryMemSize != 0) { + SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize); } + SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime); MetricService.getInstance() .timer( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 0b2ff4a04cca..e54e7ef2c360 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -66,12 +66,6 @@ public abstract class AbstractMemTable implements IMemTable { /** DeviceId -> chunkGroup(MeasurementId -> chunk). */ private final Map memTableMap; - /** - * The initial value is true because we want to calculate the text data size when recover - * memTable. - */ - protected boolean disableMemControl = true; - private boolean shouldFlush = false; private volatile FlushStatus flushStatus = FlushStatus.WORKING; private final int avgSeriesPointNumThreshold = @@ -205,7 +199,7 @@ public void insert(InsertRowNode insertRowNode) { dataTypes.add(schema.getType()); } } - memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl); + memSize += MemUtils.getRowRecordSize(dataTypes, values); write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = @@ -252,7 +246,7 @@ public void insertAlignedRow(InsertRowNode insertRowNode) { if (schemaList.isEmpty()) { return; } - memSize += MemUtils.getAlignedRecordsSize(dataTypes, values, disableMemControl); + memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values); writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber(); @@ -276,7 +270,7 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException { try { writeTabletNode(insertTabletNode, start, end); - memSize += MemUtils.getTabletSize(insertTabletNode, start, end, disableMemControl); + memSize += MemUtils.getTabletSize(insertTabletNode, start, end); int pointsInserted = (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); @@ -302,7 +296,7 @@ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, in throws WriteProcessException { try { writeAlignedTablet(insertTabletNode, start, end); - memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, disableMemControl); + memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end); int pointsInserted = (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index efacc8afc154..64a56ee3ac56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; @@ -60,8 +58,6 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; - private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - public AlignedWritableMemChunk(List schemaList) { this.measurementIndexMap = new LinkedHashMap<>(); List dataTypeList = new ArrayList<>(); @@ -399,7 +395,7 @@ public void encode(IChunkWriter chunkWriter) { tsDataType == TSDataType.TEXT ? list.getBinaryByValueIndex(sortedRowIndex, columnIndex) : null, - CONFIG.isEnableMemControl()); + true); CompressionRatio.decreaseDuplicatedMemorySize(recordSize); } continue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java index f8e625a38416..70b0f9093817 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java @@ -32,11 +32,6 @@ public PrimitiveMemTable(String database, String dataRegionId) { super(database, dataRegionId); } - public PrimitiveMemTable(String database, String dataRegionId, boolean enableMemControl) { - super(database, dataRegionId); - this.disableMemControl = !enableMemControl; - } - public PrimitiveMemTable( String database, String dataRegionId, Map memTableMap) { super(database, dataRegionId, memTableMap); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 5bbf9ac7fe38..953606ed1366 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -106,9 +106,6 @@ public class TsFileProcessor { /** IoTDB config. */ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - /** whether it's enable mem control. */ - private final boolean enableMemControl = config.isEnableMemControl(); - /** database info for mem control. */ private final DataRegionInfo dataRegionInfo; /** tsfile processor info for mem control. */ @@ -251,22 +248,21 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) } long[] memIncrements = null; - if (enableMemControl) { - long startTime = System.nanoTime(); - if (insertRowNode.isAligned()) { - memIncrements = - checkAlignedMemCostAndAddToTspInfo( - insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), - insertRowNode.getDataTypes(), insertRowNode.getValues()); - } else { - memIncrements = - checkMemCostAndAddToTspInfo( - insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), - insertRowNode.getDataTypes(), insertRowNode.getValues()); - } - // recordScheduleMemoryBlockCost - costsForMetrics[1] += System.nanoTime() - startTime; + + long memControlStartTime = System.nanoTime(); + if (insertRowNode.isAligned()) { + memIncrements = + checkAlignedMemCostAndAddToTspInfo( + insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), + insertRowNode.getDataTypes(), insertRowNode.getValues()); + } else { + memIncrements = + checkMemCostAndAddToTspInfo( + insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), + insertRowNode.getDataTypes(), insertRowNode.getValues()); } + // recordScheduleMemoryBlockCost + costsForMetrics[1] += System.nanoTime() - memControlStartTime; long startTime = System.nanoTime(); WALFlushListener walFlushListener; @@ -276,9 +272,7 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) throw walFlushListener.getCause(); } } catch (Exception e) { - if (enableMemControl) { - rollbackMemoryInfo(memIncrements); - } + rollbackMemoryInfo(memIncrements); throw new WriteProcessException( String.format( "%s: %s write WAL failed", @@ -323,7 +317,7 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics) costsForMetrics[3] += System.nanoTime() - startTime; } - private void createNewWorkingMemTable() throws WriteProcessException { + private void createNewWorkingMemTable() { workMemTable = MemTableManager.getInstance() .getAvailableMemTable( @@ -356,29 +350,27 @@ public void insertTablet( long[] memIncrements = null; try { - if (enableMemControl) { - long startTime = System.nanoTime(); - if (insertTabletNode.isAligned()) { - memIncrements = - checkAlignedMemCostAndAddToTsp( - insertTabletNode.getDevicePath().getFullPath(), - insertTabletNode.getMeasurements(), - insertTabletNode.getDataTypes(), - insertTabletNode.getColumns(), - start, - end); - } else { - memIncrements = - checkMemCostAndAddToTspInfo( - insertTabletNode.getDevicePath().getFullPath(), - insertTabletNode.getMeasurements(), - insertTabletNode.getDataTypes(), - insertTabletNode.getColumns(), - start, - end); - } - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - startTime); + long startTime = System.nanoTime(); + if (insertTabletNode.isAligned()) { + memIncrements = + checkAlignedMemCostAndAddToTsp( + insertTabletNode.getDevicePath().getFullPath(), + insertTabletNode.getMeasurements(), + insertTabletNode.getDataTypes(), + insertTabletNode.getColumns(), + start, + end); + } else { + memIncrements = + checkMemCostAndAddToTspInfo( + insertTabletNode.getDevicePath().getFullPath(), + insertTabletNode.getMeasurements(), + insertTabletNode.getDataTypes(), + insertTabletNode.getColumns(), + start, + end); } + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - startTime); } catch (WriteProcessException e) { for (int i = start; i < end; i++) { results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); @@ -397,9 +389,7 @@ public void insertTablet( for (int i = start; i < end; i++) { results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } - if (enableMemControl) { - rollbackMemoryInfo(memIncrements); - } + rollbackMemoryInfo(memIncrements); throw new WriteProcessException(e); } finally { PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(System.nanoTime() - startTime); @@ -800,13 +790,6 @@ public boolean shouldFlush() { if (workMemTable.shouldFlush()) { return true; } - if (!enableMemControl && workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) { - logger.info( - "The memtable size {} of tsfile {} reaches the threshold", - workMemTable.memSize(), - tsFileResource.getTsFile().getAbsolutePath()); - return true; - } if (workMemTable.reachTotalPointNumThreshold()) { logger.info( "The avg series points num {} of tsfile {} reaches the threshold", @@ -819,10 +802,6 @@ public boolean shouldFlush() { return false; } - private long getMemtableSizeThresholdBasedOnSeriesNum() { - return config.getMemtableSizeThreshold(); - } - public boolean shouldClose() { long fileSize = tsFileResource.getTsFileSize(); long fileSizeThreshold = sequence ? config.getSeqTsFileSize() : config.getUnSeqTsFileSize(); @@ -1047,9 +1026,7 @@ private Future addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOE lastWorkMemtableFlushTime = System.currentTimeMillis(); updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice, lastWorkMemtableFlushTime); - if (enableMemControl) { - SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); - } + SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { logger.debug( @@ -1096,21 +1073,19 @@ private void releaseFlushedMemTable(IMemTable memTable) { } memTable.release(); MemTableManager.getInstance().decreaseMemtableNumber(); - if (enableMemControl) { - // reset the mem cost in StorageGroupProcessorInfo - dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost()); - if (logger.isDebugEnabled()) { - logger.debug( - "[mem control] {}: {} flush finished, try to reset system memcost, " - + "flushing memtable list size: {}", - storageGroupName, - tsFileResource.getTsFile().getName(), - flushingMemTables.size()); - } - // report to System - SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo); - SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); + // reset the mem cost in StorageGroupProcessorInfo + dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost()); + if (logger.isDebugEnabled()) { + logger.debug( + "[mem control] {}: {} flush finished, try to reset system memcost, " + + "flushing memtable list size: {}", + storageGroupName, + tsFileResource.getTsFile().getName(), + flushingMemTables.size()); } + // report to System + SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo); + SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); if (logger.isDebugEnabled()) { logger.debug( "{}: {} flush finished, remove a memtable from flushing list, " @@ -1381,10 +1356,8 @@ private void endFile() throws IOException, TsFileProcessorException { closeFileListener.onClosed(this); } - if (enableMemControl) { - tsFileProcessorInfo.clear(); - dataRegionInfo.closeTsFileProcessorAndReportToSystem(this); - } + tsFileProcessorInfo.clear(); + dataRegionInfo.closeTsFileProcessorAndReportToSystem(this); writer = null; } @@ -1400,10 +1373,8 @@ private void endEmptyFile() throws TsFileProcessorException, IOException { for (CloseFileListener closeFileListener : closeFileListeners) { closeFileListener.onClosed(this); } - if (enableMemControl) { - tsFileProcessorInfo.clear(); - dataRegionInfo.closeTsFileProcessorAndReportToSystem(this); - } + tsFileProcessorInfo.clear(); + dataRegionInfo.closeTsFileProcessorAndReportToSystem(this); logger.info( "Storage group {} close and remove empty file {}", storageGroupName, @@ -1425,16 +1396,6 @@ public void setManagedByFlushManager(boolean managedByFlushManager) { this.managedByFlushManager = managedByFlushManager; } - /** sync method */ - public boolean isMemtableNotNull() { - flushQueryLock.writeLock().lock(); - try { - return workMemTable != null; - } finally { - flushQueryLock.writeLock().unlock(); - } - } - /** close this tsfile */ public void close() throws TsFileProcessorException { try { @@ -1559,10 +1520,6 @@ public void putMemTableBackAndClose() throws TsFileProcessorException { } } - public TsFileProcessorInfo getTsFileProcessorInfo() { - return tsFileProcessorInfo; - } - public void setTsFileProcessorInfo(TsFileProcessorInfo tsFileProcessorInfo) { this.tsFileProcessorInfo = tsFileProcessorInfo; } @@ -1581,10 +1538,6 @@ public long getWorkMemTableUpdateTime() { return workMemTable != null ? workMemTable.getUpdateTime() : Long.MAX_VALUE; } - public long getLastWorkMemtableFlushTime() { - return lastWorkMemtableFlushTime; - } - public boolean isSequence() { return sequence; } @@ -1593,10 +1546,6 @@ public void setWorkMemTableShouldFlush() { workMemTable.setShouldFlush(); } - public void addFlushListener(FlushListener listener) { - flushListeners.add(listener); - } - public void addCloseFileListener(CloseFileListener listener) { closeFileListeners.add(listener); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 8d062396f49d..c1a71d32a533 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -337,7 +337,7 @@ public void encode(IChunkWriter chunkWriter) { MemUtils.getRecordSize( tsDataType, tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex) : null, - CONFIG.isEnableMemControl()); + true); CompressionRatio.decreaseDuplicatedMemorySize(recordSize); continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java index 5f44e11099ed..ef6f4edc3cf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/MemTableManager.java @@ -18,23 +18,10 @@ */ package org.apache.iotdb.db.storageengine.rescon.memory; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class MemTableManager { - - private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - - private static final Logger logger = LoggerFactory.getLogger(MemTableManager.class); - - private static final int WAIT_TIME = 100; - public static final int MEMTABLE_NUM_FOR_EACH_PARTITION = 4; private int currentMemtableNumber = 0; private MemTableManager() {} @@ -43,36 +30,9 @@ public static MemTableManager getInstance() { return InstanceHolder.INSTANCE; } - public synchronized IMemTable getAvailableMemTable(String storageGroup, String dataRegionId) - throws WriteProcessException { - if (CONFIG.isEnableMemControl()) { - currentMemtableNumber++; - return new PrimitiveMemTable(storageGroup, dataRegionId, CONFIG.isEnableMemControl()); - } - - if (!reachMaxMemtableNumber()) { - currentMemtableNumber++; - return new PrimitiveMemTable(storageGroup, dataRegionId); - } - - // wait until the total number of memtable is less than the system capacity - int waitCount = 1; - while (true) { - if (!reachMaxMemtableNumber()) { - currentMemtableNumber++; - return new PrimitiveMemTable(storageGroup, dataRegionId); - } - try { - wait(WAIT_TIME); - } catch (InterruptedException e) { - logger.error("{} fails to wait for memtables {}, continue to wait", storageGroup, e); - Thread.currentThread().interrupt(); - throw new WriteProcessException(e); - } - if (waitCount++ % 10 == 0) { - logger.info("{} has waited for a memtable for {}ms", storageGroup, waitCount * WAIT_TIME); - } - } + public synchronized IMemTable getAvailableMemTable(String storageGroup, String dataRegionId) { + currentMemtableNumber++; + return new PrimitiveMemTable(storageGroup, dataRegionId); } public int getCurrentMemtableNumber() { @@ -84,20 +44,6 @@ public synchronized void decreaseMemtableNumber() { notifyAll(); } - /** Called when memory control is disabled */ - private boolean reachMaxMemtableNumber() { - return currentMemtableNumber >= CONFIG.getMaxMemtableNumber(); - } - - /** Called when memory control is disabled */ - public synchronized void addOrDeleteStorageGroup(int diff) { - int maxMemTableNum = CONFIG.getMaxMemtableNumber(); - maxMemTableNum += - MEMTABLE_NUM_FOR_EACH_PARTITION * CONFIG.getConcurrentWritingTimePartition() * diff; - CONFIG.setMaxMemtableNumber(maxMemTableNum); - notifyAll(); - } - public synchronized void close() { currentMemtableNumber = 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 010bd9eb6ad5..f17dba51030e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; -import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.slf4j.Logger; @@ -65,7 +64,7 @@ public class SystemInfo { private int totalFileLimitForCrossTask = config.getTotalFileLimitForCrossTask(); - private ExecutorService flushTaskSubmitThreadPool = + private final ExecutorService flushTaskSubmitThreadPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName()); private double FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); private double REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); @@ -100,8 +99,7 @@ public synchronized boolean reportStorageGroupStatus( dataRegionInfo.setLastReportedSize(currentDataRegionMemCost); if (totalStorageGroupMemCost < FLUSH_THRESHOLD) { return true; - } else if (totalStorageGroupMemCost >= FLUSH_THRESHOLD - && totalStorageGroupMemCost < REJECT_THRESHOLD) { + } else if (totalStorageGroupMemCost < REJECT_THRESHOLD) { logger.debug( "The total database mem costs are too large, call for flushing. " + "Current sg cost is {}", @@ -260,9 +258,6 @@ public boolean addCompactionMemoryCost( public synchronized void resetCompactionMemoryCost( CompactionTaskType taskType, long compactionMemoryCost) { - if (!config.isEnableCompactionMemControl()) { - return; - } this.compactionMemoryCost.addAndGet(-compactionMemoryCost); switch (taskType) { case INNER_SEQ: @@ -284,11 +279,7 @@ public synchronized void decreaseCompactionFileNumCost(int fileNum) { } public long getMemorySizeForCompaction() { - if (config.isEnableMemControl()) { - return memorySizeForCompaction; - } else { - return Long.MAX_VALUE; - } + return memorySizeForCompaction; } public void allocateWriteMemory() { @@ -350,7 +341,7 @@ private void logCurrentTotalSGMemory() { */ private boolean chooseMemTablesToMarkFlush(TsFileProcessor currentTsFileProcessor) { // If invoke flush by replaying logs, do not flush now! - if (reportedStorageGroupMemCostMap.size() == 0) { + if (reportedStorageGroupMemCostMap.isEmpty()) { return false; } PriorityQueue allTsFileProcessors = @@ -370,10 +361,7 @@ private boolean chooseMemTablesToMarkFlush(TsFileProcessor currentTsFileProcesso TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); selectedTsFileProcessor.setWorkMemTableShouldFlush(); - flushTaskSubmitThreadPool.submit( - () -> { - selectedTsFileProcessor.submitAFlushTask(); - }); + flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); if (selectedTsFileProcessor == currentTsFileProcessor) { isCurrentTsFileProcessorSelected = true; } @@ -408,7 +396,7 @@ private static class InstanceHolder { private InstanceHolder() {} - private static SystemInfo instance = new SystemInfo(); + private static final SystemInfo instance = new SystemInfo(); } public synchronized void applyTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) { @@ -438,8 +426,4 @@ public double getFlushThershold() { public double getRejectThershold() { return REJECT_THRESHOLD; } - - public int flushingMemTableNum() { - return FlushManager.getInstance().getNumberOfWorkingTasks(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index d82451eaa0e6..a033539cdcb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -46,8 +46,9 @@ public class MemUtils { private MemUtils() {} /** - * function for getting the value size. If mem control enabled, do not add text data size here, - * the size will be added to memtable before inserting. + * Function for obtaining the value size. For text values, there are two conditions: 1. During + * insertion, their size has already been added to memory. 2. During flushing, their size needs to + * be calculated. */ public static long getRecordSize(TSDataType dataType, Object value, boolean addingTextDataSize) { if (dataType == TSDataType.TEXT) { @@ -57,11 +58,10 @@ public static long getRecordSize(TSDataType dataType, Object value, boolean addi } /** - * function for getting the value size. If mem control enabled, do not add text data size here, - * the size will be added to memtable before inserting. + * Function for obtaining the value size. For text values, their size has already been added to + * memory before insertion */ - public static long getRecordsSize( - List dataTypes, Object[] value, boolean addingTextDataSize) { + public static long getRowRecordSize(List dataTypes, Object[] value) { int emptyRecordCount = 0; long memSize = 0L; for (int i = 0; i < value.length; i++) { @@ -69,28 +69,23 @@ public static long getRecordsSize( emptyRecordCount++; continue; } - memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], addingTextDataSize); + memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], false); } return memSize; } /** - * function for getting the vector value size. If mem control enabled, do not add text data size - * here, the size will be added to memtable before inserting. + * Function for obtaining the value size. For text values, their size has already been added to + * memory before insertion */ - public static long getAlignedRecordsSize( - List dataTypes, Object[] value, boolean addingTextDataSize) { + public static long getAlignedRowRecordSize(List dataTypes, Object[] value) { // time and index size long memSize = 8L + 4L; for (int i = 0; i < dataTypes.size(); i++) { - if (value[i] == null) { + if (value[i] == null || dataTypes.get(i) == TSDataType.TEXT) { continue; } - if (dataTypes.get(i) == TSDataType.TEXT) { - memSize += (addingTextDataSize ? getBinarySize((Binary) value[i]) : 0); - } else { - memSize += dataTypes.get(i).getDataTypeSize(); - } + memSize += dataTypes.get(i).getDataTypeSize(); } return memSize; } @@ -101,7 +96,7 @@ public static long getBinarySize(Binary value) { public static long getBinaryColumnSize(Binary[] column, int start, int end) { long memSize = 0; - memSize += (end - start) * RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; + memSize += (long) (end - start) * RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; for (int i = start; i < end; i++) { memSize += RamUsageEstimator.sizeOf(column[i].getValues()); } @@ -109,11 +104,10 @@ public static long getBinaryColumnSize(Binary[] column, int start, int end) { } /** - * If mem control enabled, do not add text data size here, the size will be added to memtable - * before inserting. + * Function for obtaining the value size. For text values, their size has already been added to + * memory before insertion */ - public static long getTabletSize( - InsertTabletNode insertTabletNode, int start, int end, boolean addingTextDataSize) { + public static long getTabletSize(InsertTabletNode insertTabletNode, int start, int end) { if (start >= end) { return 0L; } @@ -124,19 +118,12 @@ public static long getTabletSize( } // time column memSize memSize += (end - start) * 8L; - if (insertTabletNode.getDataTypes()[i] == TSDataType.TEXT && addingTextDataSize) { - for (int j = start; j < end; j++) { - memSize += getBinarySize(((Binary[]) insertTabletNode.getColumns()[i])[j]); - } - } else { - memSize += (end - start) * insertTabletNode.getDataTypes()[i].getDataTypeSize(); - } + memSize += (long) (end - start) * insertTabletNode.getDataTypes()[i].getDataTypeSize(); } return memSize; } - public static long getAlignedTabletSize( - InsertTabletNode insertTabletNode, int start, int end, boolean addingTextDataSize) { + public static long getAlignedTabletSize(InsertTabletNode insertTabletNode, int start, int end) { if (start >= end) { return 0L; } @@ -145,16 +132,7 @@ public static long getAlignedTabletSize( if (insertTabletNode.getMeasurements()[i] == null) { continue; } - TSDataType valueType; - // value columns memSize - valueType = insertTabletNode.getDataTypes()[i]; - if (valueType == TSDataType.TEXT && addingTextDataSize) { - for (int j = start; j < end; j++) { - memSize += getBinarySize(((Binary[]) insertTabletNode.getColumns()[i])[j]); - } - } else { - memSize += (long) (end - start) * valueType.getDataTypeSize(); - } + memSize += (long) (end - start) * insertTabletNode.getDataTypes()[i].getDataTypeSize(); } // time and index column memSize for vector memSize += (end - start) * (8L + 4L); @@ -162,11 +140,11 @@ public static long getAlignedTabletSize( } /** Calculate how much memory will be used if the given record is written to sequence file. */ - public static long getTsRecordMem(TSRecord record) { + public static long getTsRecordMem(TSRecord tsRecord) { long memUsed = 8; // time memUsed += 8; // deviceId reference - memUsed += getStringMem(record.deviceId); - for (DataPoint dataPoint : record.dataPointList) { + memUsed += getStringMem(tsRecord.deviceId); + for (DataPoint dataPoint : tsRecord.dataPointList) { memUsed += 8; // dataPoint reference memUsed += getDataPointMem(dataPoint); } @@ -176,7 +154,7 @@ public static long getTsRecordMem(TSRecord record) { /** function for getting the memory size of the given string. */ public static long getStringMem(String str) { // wide char (2 bytes each) and 64B String overhead - return str.length() * 2 + 64L; + return str.length() * 2L + 64L; } /** function for getting the memory size of the given data point. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 1a1e3b336240..4092db7d4426 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -189,7 +189,7 @@ public void putAlignedValue(long timestamp, Object[] value) { columnValue != null ? getBinarySize((Binary) columnValue) : getBinarySize(Binary.EMPTY_VALUE); - if (memoryBinaryChunkSize[i] >= targetChunkSize) { + if (memoryBinaryChunkSize[i] >= TARGET_CHUNK_SIZE) { reachMaxChunkSizeFlag = true; } break; @@ -767,7 +767,7 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex memoryBinaryChunkSize[i] += arrayT[elementIndex + i1] != null ? getBinarySize(arrayT[elementIndex + i1]) : 0; } - if (memoryBinaryChunkSize[i] > targetChunkSize) { + if (memoryBinaryChunkSize[i] > TARGET_CHUNK_SIZE) { reachMaxChunkSizeFlag = true; } break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index d3d21470eb5c..4c5d40615306 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -99,7 +99,7 @@ public void putBinary(long timestamp, Binary value) { @Override public boolean reachMaxChunkSizeThreshold() { - return memoryBinaryChunkSize >= targetChunkSize; + return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 246102bb7eb7..f2b26158a5f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -46,10 +46,8 @@ import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; public abstract class TVList implements WALEntryValue { - - protected static final int SMALL_ARRAY_LENGTH = 32; protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent"; - protected static final long targetChunkSize = + protected static final long TARGET_CHUNK_SIZE = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); // list of timestamp array, add 1 when expanded -> data point timestamp array // index relation: arrayIndex -> elementIndex @@ -93,9 +91,9 @@ public static TVList newList(TSDataType dataType) { public static long tvListArrayMemCost(TSDataType type) { long size = 0; // time array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L; + size += PrimitiveArrayManager.ARRAY_SIZE * 8L; // value array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); + size += PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); // two array headers mem size size += NUM_BYTES_ARRAY_HEADER * 2L; // Object references size in ArrayList diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java index 38c6ed495808..79974cc7f7cf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java @@ -106,7 +106,6 @@ public void setUp() throws MetadataException, IOException { .getConfig() .setInnerUnseqCompactionPerformer(InnerUnseqCompactionPerformer.READ_POINT); IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0); - IoTDBDescriptor.getInstance().getConfig().setEnableCompactionMemControl(false); CompactionTaskManager.getInstance().start(); while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) { try { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java index a280ae220f87..a7858b9dc6ad 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionConfigRestorer.java @@ -58,9 +58,6 @@ public class CompactionConfigRestorer { private int oldMinCrossCompactionUnseqLevel = IoTDBDescriptor.getInstance().getConfig().getMinCrossCompactionUnseqFileLevel(); - private boolean oldEnableCompactionMemControl = - IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl(); - public CompactionConfigRestorer() {} public void restoreCompactionConfig() { @@ -86,6 +83,5 @@ public void restoreCompactionConfig() { config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer); config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer); config.setMinCrossCompactionUnseqFileLevel(oldMinCrossCompactionUnseqLevel); - config.setEnableCompactionMemControl(oldEnableCompactionMemControl); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 9766269e4207..e63162ed263d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -87,9 +87,6 @@ public class EnvironmentUtils { private static final long oldSeqTsFileSize = config.getSeqTsFileSize(); private static final long oldUnSeqTsFileSize = config.getUnSeqTsFileSize(); - - private static final long oldGroupSizeInByte = config.getMemtableSizeThreshold(); - private static TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration; public static boolean examinePorts = @@ -170,7 +167,6 @@ public static void cleanEnv() throws IOException, StorageEngineException { cleanAllDir(); config.setSeqTsFileSize(oldSeqTsFileSize); config.setUnSeqTsFileSize(oldUnSeqTsFileSize); - config.setMemtableSizeThreshold(oldGroupSizeInByte); } private static boolean examinePorts() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java index 7294a1a70fa9..fee451265c53 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java @@ -38,6 +38,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + public class MemUtilsTest { @Test @@ -50,7 +53,44 @@ public void getRecordSizeTest() { } @Test - public void getRecordSizeWithInsertNodeTest() throws IllegalPathException { + public void getRecordSizeWithInsertRowNodeTest() { + Object[] row = {1, 2L, 3.0f, 4.0d, new Binary("5", TSFileConfig.STRING_CHARSET)}; + List dataTypes = new ArrayList<>(); + int sizeSum = 0; + dataTypes.add(TSDataType.INT32); + sizeSum += 8 + TSDataType.INT32.getDataTypeSize(); + dataTypes.add(TSDataType.INT64); + sizeSum += 8 + TSDataType.INT64.getDataTypeSize(); + dataTypes.add(TSDataType.FLOAT); + sizeSum += 8 + TSDataType.FLOAT.getDataTypeSize(); + dataTypes.add(TSDataType.DOUBLE); + sizeSum += 8 + TSDataType.DOUBLE.getDataTypeSize(); + dataTypes.add(TSDataType.TEXT); + sizeSum += 8; + Assert.assertEquals(sizeSum, MemUtils.getRowRecordSize(dataTypes, row)); + } + + @Test + public void getRecordSizeWithInsertAlignedRowNodeTest() { + Object[] row = {1, 2L, 3.0f, 4.0d, new Binary("5", TSFileConfig.STRING_CHARSET)}; + List dataTypes = new ArrayList<>(); + int sizeSum = 0; + dataTypes.add(TSDataType.INT32); + sizeSum += TSDataType.INT32.getDataTypeSize(); + dataTypes.add(TSDataType.INT64); + sizeSum += TSDataType.INT64.getDataTypeSize(); + dataTypes.add(TSDataType.FLOAT); + sizeSum += TSDataType.FLOAT.getDataTypeSize(); + dataTypes.add(TSDataType.DOUBLE); + sizeSum += TSDataType.DOUBLE.getDataTypeSize(); + dataTypes.add(TSDataType.TEXT); + // time and index size + sizeSum += 8 + 4; + Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes, row)); + } + + @Test + public void getRecordSizeWithInsertTableNodeTest() throws IllegalPathException { PartialPath device = new PartialPath("root.sg.d1"); String[] measurements = {"s1", "s2", "s3", "s4", "s5"}; Object[] columns = { @@ -83,13 +123,52 @@ public void getRecordSizeWithInsertNodeTest() throws IllegalPathException { null, columns, 1); - Assert.assertEquals(sizeSum, MemUtils.getTabletSize(insertNode, 0, 1, false)); + Assert.assertEquals(sizeSum, MemUtils.getTabletSize(insertNode, 0, 1)); + } + + @Test + public void getRecordSizeWithInsertAlignedTableNodeTest() throws IllegalPathException { + PartialPath device = new PartialPath("root.sg.d1"); + String[] measurements = {"s1", "s2", "s3", "s4", "s5"}; + Object[] columns = { + new int[] {1}, + new long[] {2}, + new float[] {3}, + new double[] {4}, + new Binary[] {new Binary("5", TSFileConfig.STRING_CHARSET)} + }; + TSDataType[] dataTypes = new TSDataType[6]; + int sizeSum = 0; + dataTypes[0] = TSDataType.INT32; + sizeSum += TSDataType.INT32.getDataTypeSize(); + dataTypes[1] = TSDataType.INT64; + sizeSum += TSDataType.INT64.getDataTypeSize(); + dataTypes[2] = TSDataType.FLOAT; + sizeSum += TSDataType.FLOAT.getDataTypeSize(); + dataTypes[3] = TSDataType.DOUBLE; + sizeSum += TSDataType.DOUBLE.getDataTypeSize(); + dataTypes[4] = TSDataType.TEXT; + sizeSum += TSDataType.TEXT.getDataTypeSize(); + // time and index size + sizeSum += 8 + 4; + InsertTabletNode insertNode = + new InsertTabletNode( + new PlanNodeId(""), + device, + true, + measurements, + dataTypes, + new long[1], + null, + columns, + 1); + Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0, 1)); } /** This method tests MemUtils.getStringMem() and MemUtils.getDataPointMem() */ @Test public void getMemSizeTest() { - int totalSize = 0; + long totalSize = 0; String device = "root.sg.d1"; TSRecord record = new TSRecord(0, device); @@ -123,7 +202,7 @@ public void getMemSizeTest() { totalSize += MemUtils.getDataPointMem(point6); record.addTuple(point6); - totalSize += 8 * record.dataPointList.size() + MemUtils.getStringMem(device) + 16; + totalSize += 8L * record.dataPointList.size() + MemUtils.getStringMem(device) + 16; Assert.assertEquals(totalSize, MemUtils.getTsRecordMem(record)); } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 43d431a7a2d2..6bfdac3b3d02 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -168,10 +168,6 @@ data_replication_factor=1 ### Memory Control Configuration #################### -# Whether to enable memory control -# Datatype: boolean -# enable_mem_control=true - # Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine, Consensus, StreamingEngine and Free Memory. # The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. for example: 1:1:1:1:1:1 , 6:2:1:1:1:1 # If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:1:1:1 @@ -190,12 +186,6 @@ data_replication_factor=1 # TimePartitionInfo is the total memory size of last flush time of all data regions # write_memory_proportion=19:1 -# Max number of concurrent writing time partitions in one database -# This parameter is used to control total memTable number when memory control is disabled -# The max number of memTable is 4 * concurrent_writing_time_partition * database number -# Datatype: long -# concurrent_writing_time_partition=1 - # primitive array size (length of each array) in array pool # Datatype: int # primitive_array_size=64 @@ -479,11 +469,6 @@ data_replication_factor=1 # 2. SHUTDOWN: the system will be shutdown. # handle_system_error=CHANGE_TO_READ_ONLY -# Only take effects when enable_mem_control is false. -# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB. -# Datatype: long -# memtable_size_threshold=1073741824 - # Whether to timed flush sequence tsfiles' memtables. # Datatype: boolean # enable_timed_flush_seq_memtable=true diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 391426cc34fe..0505205bcb28 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -85,7 +85,6 @@ public RestorableTsFileIOWriter(File file) throws IOException { public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException { this(file, true); this.maxMetadataSize = maxMetadataSize; - this.enableMemoryControl = true; this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); this.checkMetadataSizeAndMayFlush(); } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 658a16deda68..9295077bf588 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -57,7 +57,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -111,7 +110,6 @@ public class TsFileIOWriter implements AutoCloseable { protected volatile boolean hasChunkMetadataInDisk = false; // record the total num of path in order to make bloom filter protected int pathCount = 0; - protected boolean enableMemoryControl = false; private Path lastSerializePath = null; protected LinkedList endPosInCMTForDevice = new LinkedList<>(); private volatile int chunkMetadataCount = 0; @@ -151,10 +149,8 @@ public TsFileIOWriter(TsFileOutput output, boolean test) { } /** for write with memory control */ - public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize) - throws IOException { + public TsFileIOWriter(File file, long maxMetadataSize) throws IOException { this(file); - this.enableMemoryControl = enableMemoryControl; this.maxMetadataSize = maxMetadataSize; chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); } @@ -306,9 +302,7 @@ public void writeChunk(Chunk chunk) throws IOException { /** end chunk and write some log. */ public void endCurrentChunk() { - if (enableMemoryControl) { - this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes(); - } + this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes(); chunkMetadataCount++; chunkMetadataList.add(currentChunkMetadata); currentChunkMetadata = null; @@ -508,40 +502,6 @@ public void setFile(File file) { this.file = file; } - /** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */ - public void filterChunks(Map> chunkStartTimes) { - Map startTimeIdxes = new HashMap<>(); - chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0)); - - Iterator chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator(); - while (chunkGroupMetaDataIterator.hasNext()) { - ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next(); - String deviceId = chunkGroupMetaData.getDevice(); - int chunkNum = chunkGroupMetaData.getChunkMetadataList().size(); - Iterator chunkMetaDataIterator = - chunkGroupMetaData.getChunkMetadataList().iterator(); - while (chunkMetaDataIterator.hasNext()) { - IChunkMetadata chunkMetaData = chunkMetaDataIterator.next(); - Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(), true); - int startTimeIdx = startTimeIdxes.get(path); - - List pathChunkStartTimes = chunkStartTimes.get(path); - boolean chunkValid = - startTimeIdx < pathChunkStartTimes.size() - && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime(); - if (!chunkValid) { - chunkMetaDataIterator.remove(); - chunkNum--; - } else { - startTimeIdxes.put(path, startTimeIdx + 1); - } - } - if (chunkNum == 0) { - chunkGroupMetaDataIterator.remove(); - } - } - } - public void writePlanIndices() throws IOException { ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream()); ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream()); @@ -630,7 +590,7 @@ public void setMaxPlanIndex(long maxPlanIndex) { */ public int checkMetadataSizeAndMayFlush() throws IOException { // This function should be called after all data of an aligned device has been written - if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) { + if (currentChunkMetadataSize > maxMetadataSize) { try { if (logger.isDebugEnabled()) { logger.debug( @@ -700,7 +660,7 @@ private int writeChunkMetadataToTempFile( // for each device, we only serialize it once, in order to save io writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); } - if (isNewPath && iChunkMetadataList.size() > 0) { + if (isNewPath && !iChunkMetadataList.isEmpty()) { // serialize the public info of this measurement writtenSize += ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); @@ -719,10 +679,6 @@ private int writeChunkMetadataToTempFile( return writtenSize; } - public String getCurrentChunkGroupDeviceId() { - return currentChunkGroupDeviceId; - } - public List getChunkGroupMetadataList() { return chunkGroupMetadataList; } diff --git a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java index b130c7a0c58d..949a9c1583e7 100644 --- a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java +++ b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -95,7 +95,7 @@ public void tearDown() throws IOException { /** The following tests is for ChunkMetadata serialization and deserialization. */ @Test public void testSerializeAndDeserializeChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); @@ -147,7 +147,7 @@ public void testSerializeAndDeserializeChunkMetadata() throws IOException { @Test public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); @@ -185,7 +185,7 @@ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException @Test public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); List seriesIds = new ArrayList<>(); for (int i = 0; i < 10; ++i) { @@ -258,7 +258,7 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { @Test public void testWriteCompleteFileWithNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -311,7 +311,7 @@ public void testWriteCompleteFileWithNormalChunk() throws IOException { @Test public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -398,7 +398,7 @@ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { @Test public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -490,7 +490,7 @@ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 2; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -581,7 +581,7 @@ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException { Map>>>> originTimes = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 1; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 2; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -672,7 +672,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { Map>>>> originTimes = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 1024; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -762,7 +762,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { @Test public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -798,7 +798,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOExcept public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException { Map>>>> originData = new HashMap<>(); int chunkNum = 512, seriesNum = 6; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 1; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -840,7 +840,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IO long originTestPointNum = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; try { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -881,7 +881,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { TEST_CHUNK_SIZE = 10; int deviceNum = 1024; try { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < deviceNum; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -918,7 +918,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException { Map>>>> originValue = new HashMap<>(); TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -976,7 +976,7 @@ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOExcep @Test public void testWritingCompleteMixedFiles() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < 10; ++k) { @@ -1075,7 +1075,7 @@ public void testWritingCompleteMixedFiles() throws IOException { @Test public void testWritingAlignedSeriesByColumn() throws IOException { Map>>>> originValue = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -1129,7 +1129,7 @@ public void testWritingAlignedSeriesByColumn() throws IOException { @Test public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException { Map>>>> originValue = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId);