diff --git a/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java b/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java index 7d5e2bbe..fe220c90 100644 --- a/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java +++ b/indexer-cli/src/main/java/org/apache/maven/index/cli/NexusIndexerCli.java @@ -528,10 +528,11 @@ private int unpack( CommandLine cli, Components components ) final List indexers = getIndexers( cli, components ); + try ( BufferedInputStream is = new BufferedInputStream( new FileInputStream( indexArchive ) ); // FSDirectory directory = FSDirectory.open( outputFolder.toPath() ) ) { - DefaultIndexUpdater.unpackIndexData( is, directory, (IndexingContext) Proxy.newProxyInstance( + DefaultIndexUpdater.unpackIndexData( is, 4, directory, (IndexingContext) Proxy.newProxyInstance( getClass().getClassLoader(), new Class[] {IndexingContext.class}, new PartialImplementation() { public List getIndexCreators() diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java b/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java index fab46220..32b2495e 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/DefaultIndexUpdater.java @@ -209,7 +209,8 @@ private Date loadIndexDirectory( final IndexUpdateRequest updateRequest, final R Set allGroups; if ( remoteIndexFile.endsWith( ".gz" ) ) { - IndexDataReadResult result = unpackIndexData( is, directory, updateRequest.getIndexingContext() ); + IndexDataReadResult result = unpackIndexData( is, updateRequest.getThreads(), directory, + updateRequest.getIndexingContext() ); timestamp = result.getTimestamp(); rootGroups = result.getRootGroups(); allGroups = result.getAllGroups(); @@ -380,17 +381,20 @@ public Date getTimestamp( final Properties properties, final String key ) /** * @param is an input stream to unpack index data from + * @param threads thread count to use * @param d * @param context */ - public static IndexDataReadResult unpackIndexData( final InputStream is, final Directory d, + public static IndexDataReadResult unpackIndexData( final InputStream is, final int threads, final Directory d, final IndexingContext context ) throws IOException { - NexusIndexWriter w = new NexusIndexWriter( d, new IndexWriterConfig( new NexusAnalyzer() ) ); + IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() ); + config.setUseCompoundFile( false ); + NexusIndexWriter w = new NexusIndexWriter( d, config ); try { - IndexDataReader dr = new IndexDataReader( is ); + IndexDataReader dr = new IndexDataReader( is, threads ); return dr.readIndex( w, context ); } diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java index b1c4237e..aac2ef4e 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java @@ -26,9 +26,18 @@ import java.io.IOException; import java.io.InputStream; import java.io.UTFDataFormatException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Date; -import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.lucene.document.Document; @@ -36,9 +45,15 @@ import org.apache.lucene.document.FieldType; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.FSDirectory; import org.apache.maven.index.ArtifactInfo; import org.apache.maven.index.context.IndexUtils; import org.apache.maven.index.context.IndexingContext; +import org.apache.maven.index.context.NexusAnalyzer; +import org.apache.maven.index.context.NexusIndexWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An index data reader used to parse transfer index format. @@ -47,11 +62,26 @@ */ public class IndexDataReader { + private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class ); + private final DataInputStream dis; + private final int threads; + public IndexDataReader( final InputStream is ) - throws IOException + throws IOException + { + this( is, 1 ); + } + + public IndexDataReader( final InputStream is, final int threads ) + throws IOException { + if ( threads < 1 ) + { + throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads ); + } + this.threads = threads; // MINDEXER-13 // LightweightHttpWagon may have performed automatic decompression // Handle it transparently @@ -72,8 +102,23 @@ public IndexDataReader( final InputStream is ) } public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context ) - throws IOException + throws IOException + { + if ( threads == 1 ) + { + return readIndexST( w, context ); + } + else + { + return readIndexMT( w, context ); + } + } + + private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context ) + throws IOException { + LOGGER.debug( "Reading ST index..." ); + Instant start = Instant.now(); long timestamp = readHeader(); Date date = null; @@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context ) int n = 0; Document doc; - Set rootGroups = new LinkedHashSet<>(); - Set allGroups = new LinkedHashSet<>(); + ConcurrentMap rootGroups = new ConcurrentHashMap<>(); + ConcurrentMap allGroups = new ConcurrentHashMap<>(); while ( ( doc = readDocument() ) != null ) { - ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); - if ( ai != null ) + addToIndex( doc, context, w, rootGroups, allGroups ); + n++; + } + + w.commit(); + + IndexDataReadResult result = new IndexDataReadResult(); + result.setDocumentCount( n ); + result.setTimestamp( date ); + result.setRootGroups( rootGroups.keySet() ); + result.setAllGroups( allGroups.keySet() ); + + LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() ); + return result; + } + + private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context ) + throws IOException + { + LOGGER.debug( "Reading MT index..." ); + Instant start = Instant.now(); + long timestamp = readHeader(); + + int n = 0; + + final Document theEnd = new Document(); + + ConcurrentMap rootGroups = new ConcurrentHashMap<>(); + ConcurrentMap allGroups = new ConcurrentHashMap<>(); + ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 10000 ); + + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + ArrayList errors = new ArrayList<>(); + ArrayList silos = new ArrayList<>( threads ); + for ( int i = 0; i < threads; i++ ) + { + final int silo = i; + silos.add( tempWriter( "silo" + i ) ); + executorService.execute( () -> { - w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() ); + try + { + while ( true ) + { + try + { + Document doc = queue.take(); + if ( doc == theEnd ) + { + break; + } + addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups ); + } + catch ( InterruptedException | IOException e ) + { + errors.add( e ); + break; + } + } + } + finally + { + LOGGER.debug( "Done thread {}", Thread.currentThread().getName() ); + } + } ); + } - rootGroups.add( ai.getRootGroup() ); - allGroups.add( ai.getGroupId() ); - } - else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null - || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null ) + try + { + Document doc; + while ( ( doc = readDocument() ) != null ) { - // skip it + queue.put( doc ); + n++; } - else + LOGGER.debug( "Signalling END" ); + for ( int i = 0; i < threads; i++ ) { - w.addDocument( doc ); + queue.put( theEnd ); } - n++; + + LOGGER.debug( "Shutting down threads" ); + executorService.shutdown(); + executorService.awaitTermination( 5L, TimeUnit.MINUTES ); + } + catch ( InterruptedException e ) + { + throw new IOException( "Interrupted", e ); + } + + if ( !errors.isEmpty() ) + { + IOException exception = new IOException( "Error during load of index" ); + errors.forEach( exception::addSuppressed ); + throw exception; + } + + LOGGER.debug( "Silos loaded..." ); + Date date = null; + if ( timestamp != -1 ) + { + date = new Date( timestamp ); + IndexUtils.updateTimestamp( w.getDirectory(), date ); } + LOGGER.debug( "Merging silos..." ); + for ( IndexWriter silo : silos ) + { + IndexUtils.close( silo ); + w.addIndexes( silo.getDirectory() ); + } + + LOGGER.debug( "Merged silos..." ); w.commit(); IndexDataReadResult result = new IndexDataReadResult(); result.setDocumentCount( n ); result.setTimestamp( date ); - result.setRootGroups( rootGroups ); - result.setAllGroups( allGroups ); + result.setRootGroups( rootGroups.keySet() ); + result.setAllGroups( allGroups.keySet() ); + LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() ); return result; } + private FSDirectory tempDirectory( final String name ) throws IOException + { + return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) ); + } + + private IndexWriter tempWriter( final String name ) throws IOException + { + IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() ); + config.setUseCompoundFile( false ); + return new NexusIndexWriter( tempDirectory( name ), config ); + } + + private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter, + final ConcurrentMap rootGroups, + final ConcurrentMap allGroups ) + throws IOException + { + ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); + if ( ai != null ) + { + indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + + rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE ); + allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE ); + } + else + { + // these two fields are automatically handled in code above + if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null + && doc.getField( ArtifactInfo.ROOT_GROUPS ) == null ) + { + indexWriter.addDocument( doc ); + } + } + } + public long readHeader() - throws IOException + throws IOException { final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 ); @@ -139,7 +315,7 @@ public long readHeader() } public Document readDocument() - throws IOException + throws IOException { int fieldCount; try @@ -160,7 +336,7 @@ public Document readDocument() // Fix up UINFO field wrt MINDEXER-41 final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO ); - final String info = doc.get( ArtifactInfo.INFO ); + final String info = doc.get( ArtifactInfo.INFO ); if ( uinfoField != null && info != null && !info.isEmpty() ) { final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info ); @@ -179,7 +355,7 @@ public Document readDocument() } private Field readField() - throws IOException + throws IOException { int flags = dis.read(); @@ -199,7 +375,7 @@ private Field readField() } private static String readUTF( DataInput in ) - throws IOException + throws IOException { int utflen = in.readInt(); @@ -214,7 +390,7 @@ private static String readUTF( DataInput in ) catch ( OutOfMemoryError e ) { throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!" - + " See MINDEXER-28 for more information!", e ); + + " See MINDEXER-28 for more information!", e ); } int c, char2, char3; @@ -282,7 +458,7 @@ private static String readUTF( DataInput in ) throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) ); } chararr[chararrCount++] = - (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); + (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); break; default: @@ -360,7 +536,7 @@ public Set getAllGroups() * @throws IOException in case of an IO exception during index file access */ public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context ) - throws IOException + throws IOException { dis.readByte(); // data format version diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java index 8dfbe9a5..da4f04aa 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexUpdateRequest.java @@ -55,6 +55,8 @@ public class IndexUpdateRequest private FSDirectoryFactory directoryFactory; + private int threads; + public IndexUpdateRequest( final IndexingContext context, final ResourceFetcher resourceFetcher ) { assert context != null : "Context to be updated cannot be null!"; @@ -64,6 +66,7 @@ public IndexUpdateRequest( final IndexingContext context, final ResourceFetcher this.resourceFetcher = resourceFetcher; this.forceFullUpdate = false; this.incrementalOnly = false; + this.threads = 1; } public IndexingContext getIndexingContext() @@ -165,4 +168,14 @@ public File getIndexTempDir() { return indexTempDir; } + + public int getThreads() + { + return threads; + } + + public void setThreads( int threads ) + { + this.threads = threads; + } } diff --git a/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java b/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java index f7901811..c387f0d9 100644 --- a/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java +++ b/indexer-core/src/test/java/org/apache/maven/index/updater/IndexDataTest.java @@ -88,7 +88,7 @@ protected void prepareNexusIndexer( NexusIndexer nexusIndexer ) newDir = new ByteBuffersDirectory(); - Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, newDir, context ).getTimestamp(); + Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, 1, newDir, context ).getTimestamp(); assertEquals( timestamp, newTimestamp ); @@ -126,7 +126,7 @@ public void testEmptyContext() newDir = new ByteBuffersDirectory(); - Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, newDir, context ).getTimestamp(); + Date newTimestamp = DefaultIndexUpdater.unpackIndexData( is, 1, newDir, context ).getTimestamp(); assertEquals( null, newTimestamp ); diff --git a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java index 0a8b92fb..4fe10d8a 100644 --- a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java +++ b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java @@ -129,6 +129,7 @@ public void perform() Instant updateStart = Instant.now(); System.out.println( "Updating Index..." ); System.out.println( "This might take a while on first run, so please be patient!" ); + Date centralContextCurrentTimestamp = centralContext.getTimestamp(); IndexUpdateRequest updateRequest = new IndexUpdateRequest( centralContext, new Java11HttpClient() ); IndexUpdateResult updateResult = indexUpdater.fetchAndUpdateIndex( updateRequest ); diff --git a/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java b/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java index d9fac364..2f725181 100644 --- a/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java +++ b/search-backend-indexer/src/test/java/org/apache/maven/search/backend/indexer/internal/IndexerCoreSearchBackendImplTest.java @@ -30,6 +30,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Date; @@ -158,19 +159,22 @@ public void prepareAndUpdateBackend() throws Exception // since this block will always emit at least one HTTP GET. Central indexes are updated once a week, but // other index sources might have different index publishing frequency. // Preferred frequency is once a week. + long start = System.currentTimeMillis(); System.out.println( "Updating Index..." ); System.out.println( "This might take a while on first run, so please be patient!" ); Date centralContextCurrentTimestamp = centralContext.getTimestamp(); IndexUpdateRequest updateRequest = new IndexUpdateRequest( centralContext, new Java11HttpClient() ); + updateRequest.setLocalIndexCacheDir( centralLocalCache ); + updateRequest.setThreads( 4 ); IndexUpdateResult updateResult = indexUpdater.fetchAndUpdateIndex( updateRequest ); if ( updateResult.isFullUpdate() ) { - System.out.println( "Full update happened!" ); + System.out.println( "Full update happened." ); } else if ( updateResult.getTimestamp().equals( centralContextCurrentTimestamp ) ) { - System.out.println( "No update needed, index is up to date!" ); + System.out.println( "No update needed, index is up to date." ); } else { @@ -178,6 +182,7 @@ else if ( updateResult.getTimestamp().equals( centralContextCurrentTimestamp ) ) "Incremental update happened, change covered " + centralContextCurrentTimestamp + " - " + updateResult.getTimestamp() + " period." ); } + System.out.println( "Done in " + Duration.ofMillis( System.currentTimeMillis() - start ) ); System.out.println(); this.backend = new IndexerCoreSearchBackendImpl( indexer, centralContext );