From 2c6ed50c46b8e83c654cc57d639f1a85f7b56981 Mon Sep 17 00:00:00 2001 From: Marc D'Mello Date: Wed, 23 Oct 2024 22:11:06 +0000 Subject: [PATCH] Multi-tenant index writer initial commit --- .../org/apache/lucene/index/IndexWriter.java | 35 +++++- .../lucene/index/IndexWriterRAMManager.java | 116 ++++++++++++++++++ 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/IndexWriterRAMManager.java diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 346da8a907e..87ff86cfee8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -462,6 +462,8 @@ public void onTicketBacklog() { } }; + private final IndexWriterRAMManager.PerWriterIndexWriterRAMManager indexWriterRAMManager; + /** * Expert: returns a readonly reader, covering all committed as well as un-committed changes to * the index. This provides "near real-time" searching, in that changes made during an IndexWriter @@ -939,11 +941,14 @@ protected final void ensureOpen() throws AlreadyClosedException { * @param d the index directory. The index is either created or appended according * conf.getOpenMode(). * @param conf the configuration settings according to which IndexWriter should be initialized. + * @param indexWriterRAMManager The RAM manager used for multi-tenant RAM management * @throws IOException if the directory cannot be read/written to, or if it does not exist and * conf.getOpenMode() is OpenMode.APPEND or if there is any other * low-level IO error */ - public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { + public IndexWriter( + Directory d, IndexWriterConfig conf, IndexWriterRAMManager indexWriterRAMManager) + throws IOException { enableTestPoints = isEnableTestPoints(); conf.setIndexWriter(this); // prevent reuse by other instances config = conf; @@ -1211,6 +1216,27 @@ public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { writeLock = null; } } + + if (indexWriterRAMManager != null) { + this.indexWriterRAMManager = + new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(this, indexWriterRAMManager); + } else { + this.indexWriterRAMManager = null; + } + } + + /** + * Constructor for IndexWriter's that don't require multi-tenant RAM management + * + * @param d the index directory. The index is either created or appended according + * conf.getOpenMode(). + * @param conf the configuration settings according to which IndexWriter should be initialized. + * @throws IOException if the directory cannot be read/written to, or if it does not exist and + * conf.getOpenMode() is OpenMode.APPEND or if there is any other + * low-level IO error + */ + public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { + this(d, conf, null); } /** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */ @@ -1340,6 +1366,10 @@ private void shutdown() throws IOException { } rollbackInternal(); // if we got that far lets rollback and close } + + if (indexWriterRAMManager != null) { + indexWriterRAMManager.removeWriter(); + } } /** @@ -6012,6 +6042,9 @@ private long maybeProcessEvents(long seqNo) throws IOException { seqNo = -seqNo; processEvents(true); } + if (indexWriterRAMManager != null) { + indexWriterRAMManager.flushIfNecessary(); + } return seqNo; } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterRAMManager.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterRAMManager.java new file mode 100644 index 00000000000..5721c5ea3f5 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterRAMManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * For managing multiple instances of {@link IndexWriter} sharing the same buffer (configured by + * {@link IndexWriterConfig#setRAMBufferSizeMB}) + */ +public class IndexWriterRAMManager { + + private final IndexWriterConfig config; + private final Map idToWriter = new ConcurrentHashMap<>(); + private final AtomicInteger idGenerator = new AtomicInteger(); + + /** + * Default constructor + * + * @param config the index writer config containing the max RAM buffer size + */ + public IndexWriterRAMManager(IndexWriterConfig config) { + this.config = config; + } + + private int registerWriter(IndexWriter writer) { + int id = idGenerator.incrementAndGet(); + idToWriter.put(id, writer); + return id; + } + + private void removeWriter(int id) { + if (idToWriter.containsKey(id) == false) { + throw new IllegalArgumentException( + "Writer " + id + " has not been registered or has been removed already"); + } + idToWriter.remove(id); + } + + private void flushIfNecessary(int id) throws IOException { + if (idToWriter.containsKey(id) == false) { + throw new IllegalArgumentException( + "Writer " + id + " has not been registered or has been removed already"); + } + long totalRam = 0L; + for (IndexWriter writer : idToWriter.values()) { + totalRam += writer.ramBytesUsed(); + } + if (totalRam >= config.getRAMBufferSizeMB() * 1024 * 1024) { + IndexWriter writerToFlush = chooseWriterToFlush(idToWriter.values(), idToWriter.get(id)); + writerToFlush.flushNextBuffer(); + } + } + + /** + * Chooses which writer should be flushed. Default implementation chooses the writer with most RAM + * usage + * + * @param writers list of writers registered with this {@link IndexWriterRAMManager} + * @param callingWriter the writer calling {@link IndexWriterRAMManager#flushIfNecessary} + * @return the IndexWriter to flush + */ + protected static IndexWriter chooseWriterToFlush( + Collection writers, IndexWriter callingWriter) { + IndexWriter highestBufferWriter = null; + long highestRam = 0L; + for (IndexWriter writer : writers) { + long writerRamBytes = writer.ramBytesUsed(); + if (writerRamBytes > highestRam) { + highestRam = writerRamBytes; + highestBufferWriter = writer; + } + } + return highestBufferWriter; + } + + /** + * For use in {@link IndexWriter}, manages communication with the {@link IndexWriterRAMManager} + */ + public static class PerWriterIndexWriterRAMManager { + + private final int id; + private final IndexWriterRAMManager manager; + + PerWriterIndexWriterRAMManager(IndexWriter writer, IndexWriterRAMManager manager) { + id = manager.registerWriter(writer); + this.manager = manager; + } + + void removeWriter() { + manager.removeWriter(id); + } + + void flushIfNecessary() throws IOException { + manager.flushIfNecessary(id); + } + } +}