From 2e2a655b9976ed6a4a8f0ae040534012cad6cae3 Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Fri, 4 Nov 2022 18:04:54 +0100 Subject: [PATCH] [MINDEXER-151] Speed up Index update from remote (#255) Introduce MT index update, allow users to specify the count of threads/silos to use while importing index. Without code change threads = 1 and there is no change in codepath (everything works as before). This change (ingesting GZIP files raw records into Lucene index on multiple threads) on my PC halves the execution time: while BasicUsageExample on master takes over 15 minutes to finish (when doing full update), this PR makes it under 7 minutes when using 4 threads. --- https://issues.apache.org/jira/browse/MINDEXER-151 --- .../maven/index/cli/NexusIndexerCli.java | 3 +- .../index/updater/DefaultIndexUpdater.java | 12 +- .../maven/index/updater/IndexDataReader.java | 230 ++++++++++++++++-- .../index/updater/IndexUpdateRequest.java | 13 + .../maven/index/updater/IndexDataTest.java | 4 +- .../index/examples/BasicUsageExample.java | 1 + .../IndexerCoreSearchBackendImplTest.java | 9 +- 7 files changed, 236 insertions(+), 36 deletions(-) diff --git a/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java b/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java index 7d5e2bbe..fe220c90 100644 --- a/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java +++ b/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java @@ -528,10 +528,11 @@ private int unpack( CommandLine cli, Components components ) final List indexers = getIndexers( cli, components ); + try ( BufferedInputStream is = new BufferedInputStream( new FileInputStream( indexArchive ) ); // FSDirectory directory = FSDirectory.open( outputFolder.toPath() ) ) { - DefaultIndexUpdater.unpackIndexData( is, directory, (IndexingContext) Proxy.newProxyInstance( + DefaultIndexUpdater.unpackIndexData( is, 4, directory, (IndexingContext) Proxy.newProxyInstance( getClass().getClassLoader(), new Class[] {IndexingContext.class}, new PartialImplementation() { public List getIndexCreators() diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java b/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java index fab46220..32b2495e 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java @@ -209,7 +209,8 @@ private Date loadIndexDirectory( final IndexUpdateRequest updateRequest, final R Set allGroups; if ( remoteIndexFile.endsWith( ".gz" ) ) { - IndexDataReadResult result = unpackIndexData( is, directory, updateRequest.getIndexingContext() ); + IndexDataReadResult result = unpackIndexData( is, updateRequest.getThreads(), directory, + updateRequest.getIndexingContext() ); timestamp = result.getTimestamp(); rootGroups = result.getRootGroups(); allGroups = result.getAllGroups(); @@ -380,17 +381,20 @@ public Date getTimestamp( final Properties properties, final String key ) /** * @param is an input stream to unpack index data from + * @param threads thread count to use * @param d * @param context */ - public static IndexDataReadResult unpackIndexData( final InputStream is, final Directory d, + public static IndexDataReadResult unpackIndexData( final InputStream is, final int threads, final Directory d, final IndexingContext context ) throws IOException { - NexusIndexWriter w = new NexusIndexWriter( d, new IndexWriterConfig( new NexusAnalyzer() ) ); + IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() ); + config.setUseCompoundFile( false ); + NexusIndexWriter w = new NexusIndexWriter( d, config ); try { - IndexDataReader dr = new IndexDataReader( is ); + IndexDataReader dr = new IndexDataReader( is, threads ); return dr.readIndex( w, context ); } diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java index b1c4237e..aac2ef4e 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java @@ -26,9 +26,18 @@ import java.io.IOException; import java.io.InputStream; import java.io.UTFDataFormatException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Date; -import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.lucene.document.Document; @@ -36,9 +45,15 @@ import org.apache.lucene.document.FieldType; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.FSDirectory; import org.apache.maven.index.ArtifactInfo; import org.apache.maven.index.context.IndexUtils; import org.apache.maven.index.context.IndexingContext; +import org.apache.maven.index.context.NexusAnalyzer; +import org.apache.maven.index.context.NexusIndexWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An index data reader used to parse transfer index format. @@ -47,11 +62,26 @@ */ public class IndexDataReader { + private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class ); + private final DataInputStream dis; + private final int threads; + public IndexDataReader( final InputStream is ) - throws IOException + throws IOException + { + this( is, 1 ); + } + + public IndexDataReader( final InputStream is, final int threads ) + throws IOException { + if ( threads < 1 ) + { + throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads ); + } + this.threads = threads; // MINDEXER-13 // LightweightHttpWagon may have performed automatic decompression // Handle it transparently @@ -72,8 +102,23 @@ public IndexDataReader( final InputStream is ) } public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context ) - throws IOException + throws IOException + { + if ( threads == 1 ) + { + return readIndexST( w, context ); + } + else + { + return readIndexMT( w, context ); + } + } + + private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context ) + throws IOException { + LOGGER.debug( "Reading ST index..." ); + Instant start = Instant.now(); long timestamp = readHeader(); Date date = null; @@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context ) int n = 0; Document doc; - Set rootGroups = new LinkedHashSet<>(); - Set allGroups = new LinkedHashSet<>(); + ConcurrentMap rootGroups = new ConcurrentHashMap<>(); + ConcurrentMap allGroups = new ConcurrentHashMap<>(); while ( ( doc = readDocument() ) != null ) { - ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); - if ( ai != null ) + addToIndex( doc, context, w, rootGroups, allGroups ); + n++; + } + + w.commit(); + + IndexDataReadResult result = new IndexDataReadResult(); + result.setDocumentCount( n ); + result.setTimestamp( date ); + result.setRootGroups( rootGroups.keySet() ); + result.setAllGroups( allGroups.keySet() ); + + LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() ); + return result; + } + + private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context ) + throws IOException + { + LOGGER.debug( "Reading MT index..." ); + Instant start = Instant.now(); + long timestamp = readHeader(); + + int n = 0; + + final Document theEnd = new Document(); + + ConcurrentMap rootGroups = new ConcurrentHashMap<>(); + ConcurrentMap allGroups = new ConcurrentHashMap<>(); + ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 10000 ); + + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + ArrayList errors = new ArrayList<>(); + ArrayList silos = new ArrayList<>( threads ); + for ( int i = 0; i < threads; i++ ) + { + final int silo = i; + silos.add( tempWriter( "silo" + i ) ); + executorService.execute( () -> { - w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() ); + try + { + while ( true ) + { + try + { + Document doc = queue.take(); + if ( doc == theEnd ) + { + break; + } + addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups ); + } + catch ( InterruptedException | IOException e ) + { + errors.add( e ); + break; + } + } + } + finally + { + LOGGER.debug( "Done thread {}", Thread.currentThread().getName() ); + } + } ); + } - rootGroups.add( ai.getRootGroup() ); - allGroups.add( ai.getGroupId() ); - } - else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null - || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null ) + try + { + Document doc; + while ( ( doc = readDocument() ) != null ) { - // skip it + queue.put( doc ); + n++; } - else + LOGGER.debug( "Signalling END" ); + for ( int i = 0; i < threads; i++ ) { - w.addDocument( doc ); + queue.put( theEnd ); } - n++; + + LOGGER.debug( "Shutting down threads" ); + executorService.shutdown(); + executorService.awaitTermination( 5L, TimeUnit.MINUTES ); + } + catch ( InterruptedException e ) + { + throw new IOException( "Interrupted", e ); + } + + if ( !errors.isEmpty() ) + { + IOException exception = new IOException( "Error during load of index" ); + errors.forEach( exception::addSuppressed ); + throw exception; + } + + LOGGER.debug( "Silos loaded..." ); + Date date = null; + if ( timestamp != -1 ) + { + date = new Date( timestamp ); + IndexUtils.updateTimestamp( w.getDirectory(), date ); } + LOGGER.debug( "Merging silos..." ); + for ( IndexWriter silo : silos ) + { + IndexUtils.close( silo ); + w.addIndexes( silo.getDirectory() ); + } + + LOGGER.debug( "Merged silos..." ); w.commit(); IndexDataReadResult result = new IndexDataReadResult(); result.setDocumentCount( n ); result.setTimestamp( date ); - result.setRootGroups( rootGroups ); - result.setAllGroups( allGroups ); + result.setRootGroups( rootGroups.keySet() ); + result.setAllGroups( allGroups.keySet() ); + LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() ); return result; } + private FSDirectory tempDirectory( final String name ) throws IOException + { + return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) ); + } + + private IndexWriter tempWriter( final String name ) throws IOException + { + IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() ); + config.setUseCompoundFile( false ); + return new NexusIndexWriter( tempDirectory( name ), config ); + } + + private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter, + final ConcurrentMap rootGroups, + final ConcurrentMap allGroups ) + throws IOException + { + ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); + if ( ai != null ) + { + indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + + rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE ); + allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE ); + } + else + { + // these two fields are automatically handled in code above + if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null + && doc.getField( ArtifactInfo.ROOT_GROUPS ) == null ) + { + indexWriter.addDocument( doc ); + } + } + } + public long readHeader() - throws IOException + throws IOException { final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 ); @@ -139,7 +315,7 @@ public long readHeader() } public Document readDocument() - throws IOException + throws IOException { int fieldCount; try @@ -160,7 +336,7 @@ public Document readDocument() // Fix up UINFO field wrt MINDEXER-41 final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO ); - final String info = doc.get( ArtifactInfo.INFO ); + final String info = doc.get( ArtifactInfo.INFO ); if ( uinfoField != null && info != null && !info.isEmpty() ) { final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info ); @@ -179,7 +355,7 @@ public Document readDocument() } private Field readField() - throws IOException + throws IOException { int flags = dis.read(); @@ -199,7 +375,7 @@ private Field readField() } private static String readUTF( DataInput in ) - throws IOException + throws IOException { int utflen = in.readInt(); @@ -214,7 +390,7 @@ private static String readUTF( DataInput in ) catch ( OutOfMemoryError e ) { throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!" - + " See MINDEXER-28 for more information!", e ); + + " See MINDEXER-28 for more information!", e ); } int c, char2, char3; @@ -282,7 +458,7 @@ private static String readUTF( DataInput in ) throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) ); } chararr[chararrCount++] = - (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); + (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); break; default: @@ -360,7 +536,7 @@ public Set getAllGroups() * @throws IOException in case of an IO exception during index file access */ public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context ) - throws IOException + throws IOException { dis.readByte(); // data format version diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java index 8dfbe9a5..da4f04aa 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java @@ -55,6 +55,8 @@ public class IndexUpdateRequest private FSDirectoryFactory directoryFactory; + private int threads; + public IndexUpdateRequest( final IndexingContext context, final ResourceFetcher resourceFetcher ) { assert context != null : "Context to be updated cannot be null!"; @@ -64,6 +66,7 @@ public IndexUpdateRequest( final IndexingContext context, final ResourceFetcher this.resourceFetcher = resourceFetcher; this.forceFullUpdate = false; this.incrementalOnly = false; + this.threads = 1; } public IndexingContext getIndexingContext() @@ -165,4 +168,14 @@ public File getIndexTempDir() { return indexTempDir; } + + public int getThreads() + { + return threads; + } + + public void setThreads( int threads ) + { + this.threads = threads; + } } diff --git a/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java b/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java index f7901811..c387f0d9 100644 --- a/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java +++ b/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java @@ -88,7 +88,7 @@ protected void prepareNexusIndexer( NexusIndexer nexusIndexer ) newDir = new ByteBuffersDirectory(); - Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, newDir, context ).getTimestamp(); + Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, 1, newDir, context ).getTimestamp(); assertEquals( timestamp, newTimestamp ); @@ -126,7 +126,7 @@ public void testEmptyContext() newDir = new ByteBuffersDirectory(); - Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, newDir, context ).getTimestamp(); + Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, 1, newDir, context ).getTimestamp(); assertEquals( null, newTimestamp ); diff --git a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java index 0a8b92fb..4fe10d8a 100644 --- a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java +++ b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java @@ -129,6 +129,7 @@ public void perform() Instant updateStart = Instant.now(); System.out.println( "Updating Index..." ); System.out.println( "This might take a while on first run, so please be patient!" ); + Date centralContextCurrentTimestamp = centralContext.getTimestamp(); IndexUpdateRequest updateRequest = new IndexUpdateRequest( centralContext, new Java11HttpClient() ); IndexUpdateResult updateResult = indexUpdater.fetchAndUpdateIndex( updateRequest ); diff --git a/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java b/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java index d9fac364..2f725181 100644 --- a/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java +++ b/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java @@ -30,6 +30,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Date; @@ -158,19 +159,22 @@ public void prepareAndUpdateBackend() throws Exception // since this block will always emit at least one HTTP GET. Central indexes are updated once a week, but // other index sources might have different index publishing frequency. // Preferred frequency is once a week. + long start = System.currentTimeMillis(); System.out.println( "Updating Index..." ); System.out.println( "This might take a while on first run, so please be patient!" ); Date centralContextCurrentTimestamp = centralContext.getTimestamp(); IndexUpdateRequest updateRequest = new IndexUpdateRequest( centralContext, new Java11HttpClient() ); + updateRequest.setLocalIndexCacheDir( centralLocalCache ); + updateRequest.setThreads( 4 ); IndexUpdateResult updateResult = indexUpdater.fetchAndUpdateIndex( updateRequest ); if ( updateResult.isFullUpdate() ) { - System.out.println( "Full update happened!" ); + System.out.println( "Full update happened." ); } else if ( updateResult.getTimestamp().equals( centralContextCurrentTimestamp ) ) { - System.out.println( "No update needed, index is up to date!" ); + System.out.println( "No update needed, index is up to date." ); } else { @@ -178,6 +182,7 @@ else if ( updateResult.getTimestamp().equals( centralContextCurrentTimestamp ) ) "Incremental update happened, change covered " + centralContextCurrentTimestamp + " - " + updateResult.getTimestamp() + " period." ); } + System.out.println( "Done in " + Duration.ofMillis( System.currentTimeMillis() - start ) ); System.out.println(); this.backend = new IndexerCoreSearchBackendImpl( indexer, centralContext );