Skip to content

Commit

Permalink
Fix VolatileSubtreeIterator missing update buffer if all nodes in new…
Browse files Browse the repository at this point in the history
… buffer are not qualified (apache#12093)

Fix VolatileSubtreeIterator missing update buffer if all nodes in new buffer are not qualified
  • Loading branch information
Cpaulyz authored and SzyWilliam committed Nov 22, 2024
1 parent e7e27e2 commit bdf21bb
Showing 1 changed file with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.MemoryManager.STATUS.ITERATE_NEW_BUFFER;
import static org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory.MemoryManager.STATUS.ITERATE_UPDATE_BUFFER;
import static org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getBelongedContainer;
import static org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer.getCachedMNodeContainer;

Expand Down Expand Up @@ -293,13 +295,13 @@ private class VolatileSubtreeIterator implements Iterator<ICachedMNode> {

private final ICachedMNodeContainer container;
private Iterator<ICachedMNode> bufferedNodeIterator;
private byte status;
private STATUS status;
private ICachedMNode nextSubtree = null;

private VolatileSubtreeIterator(ICachedMNodeContainer container) {
this.container = container;
this.bufferedNodeIterator = container.getNewChildFlushingBuffer().values().iterator();
this.status = 0;
this.status = ITERATE_NEW_BUFFER;
}

@Override
Expand All @@ -323,13 +325,16 @@ public ICachedMNode next() {
private void tryGetNext() {
ICachedMNode node;
CacheEntry cacheEntry;
if (!bufferedNodeIterator.hasNext() && status == 0) {
// flushingBuffer of NewChildBuffer has been traversed, and the flushingBuffer of
// UpdateChildBuffer needs to be traversed.
bufferedNodeIterator = container.getUpdatedChildFlushingBuffer().values().iterator();
status = 1;
}
while (bufferedNodeIterator.hasNext()) {
while (bufferedNodeIterator.hasNext() || status == ITERATE_NEW_BUFFER) {
if (!bufferedNodeIterator.hasNext()) {
// flushingBuffer of NewChildBuffer has been traversed, and the flushingBuffer of
// UpdateChildBuffer needs to be traversed.
bufferedNodeIterator = container.getUpdatedChildFlushingBuffer().values().iterator();
status = ITERATE_UPDATE_BUFFER;
if (!bufferedNodeIterator.hasNext()) {
return;
}
}
node = bufferedNodeIterator.next();

// prevent this node being added buffer during the following check and potential flush
Expand All @@ -345,7 +350,7 @@ private void tryGetNext() {
cacheEntry = getCacheEntry(node);

synchronized (cacheEntry) {
if (status == 1
if (status == ITERATE_UPDATE_BUFFER
&& container.getUpdatedChildReceivingBuffer().containsKey(node.getName())) {
if (cacheEntry.hasVolatileDescendant()
&& getCachedMNodeContainer(node).hasChildrenInBuffer()) {
Expand All @@ -356,13 +361,15 @@ && getCachedMNodeContainer(node).hasChildrenInBuffer()) {
// return for flush
nextSubtree = node;
unlockImmediately = false;
return;
} else {
continue;
}
return;
}

cacheEntry.setVolatile(false);
memoryStatistics.removeVolatileNode();
if (status == 1) {
if (status == ITERATE_UPDATE_BUFFER) {
container.moveMNodeFromUpdateChildBufferToCache(node.getName());
} else {
container.moveMNodeFromNewChildBufferToCache(node.getName());
Expand Down Expand Up @@ -625,4 +632,15 @@ public long getBufferNodeNum() {
public long getCacheNodeNum() {
return nodeCache.getCacheNodeNum();
}

enum STATUS {
ITERATE_NEW_BUFFER((byte) 0),
ITERATE_UPDATE_BUFFER((byte) 1);

private final byte status;

STATUS(byte status) {
this.status = status;
}
}
}

0 comments on commit bdf21bb

Please sign in to comment.