Skip to content

Commit

Permalink
[MINDEXER-151] Speed up Index update from remote (#255)
Browse files Browse the repository at this point in the history
Introduce MT index update, allow users to specify the count of threads/silos to use while importing index. Without code change threads = 1 and there is no change in codepath (everything works as before).

This change (ingesting GZIP files raw records into Lucene index on multiple threads) on my PC halves the execution time: while BasicUsageExample on master takes over 15 minutes to finish (when doing full update), this PR makes it under 7 minutes when using 4 threads.

---

https://issues.apache.org/jira/browse/MINDEXER-151
  • Loading branch information
cstamas authored Nov 4, 2022
1 parent 6b23afe commit 2e2a655
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,11 @@ private int unpack( CommandLine cli, Components components )

final List<IndexCreator> indexers = getIndexers( cli, components );


try ( BufferedInputStream is = new BufferedInputStream( new FileInputStream( indexArchive ) ); //
FSDirectory directory = FSDirectory.open( outputFolder.toPath() ) )
{
DefaultIndexUpdater.unpackIndexData( is, directory, (IndexingContext) Proxy.newProxyInstance(
DefaultIndexUpdater.unpackIndexData( is, 4, directory, (IndexingContext) Proxy.newProxyInstance(
getClass().getClassLoader(), new Class[] {IndexingContext.class}, new PartialImplementation()
{
public List<IndexCreator> getIndexCreators()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private Date loadIndexDirectory( final IndexUpdateRequest updateRequest, final R
Set<String> allGroups;
if ( remoteIndexFile.endsWith( ".gz" ) )
{
IndexDataReadResult result = unpackIndexData( is, directory, updateRequest.getIndexingContext() );
IndexDataReadResult result = unpackIndexData( is, updateRequest.getThreads(), directory,
updateRequest.getIndexingContext() );
timestamp = result.getTimestamp();
rootGroups = result.getRootGroups();
allGroups = result.getAllGroups();
Expand Down Expand Up @@ -380,17 +381,20 @@ public Date getTimestamp( final Properties properties, final String key )

/**
* @param is an input stream to unpack index data from
* @param threads thread count to use
* @param d
* @param context
*/
public static IndexDataReadResult unpackIndexData( final InputStream is, final Directory d,
public static IndexDataReadResult unpackIndexData( final InputStream is, final int threads, final Directory d,
final IndexingContext context )
throws IOException
{
NexusIndexWriter w = new NexusIndexWriter( d, new IndexWriterConfig( new NexusAnalyzer() ) );
IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
config.setUseCompoundFile( false );
NexusIndexWriter w = new NexusIndexWriter( d, config );
try
{
IndexDataReader dr = new IndexDataReader( is );
IndexDataReader dr = new IndexDataReader( is, threads );

return dr.readIndex( w, context );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,34 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import org.apache.maven.index.ArtifactInfo;
import org.apache.maven.index.context.IndexUtils;
import org.apache.maven.index.context.IndexingContext;
import org.apache.maven.index.context.NexusAnalyzer;
import org.apache.maven.index.context.NexusIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An index data reader used to parse transfer index format.
Expand All @@ -47,11 +62,26 @@
*/
public class IndexDataReader
{
private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );

private final DataInputStream dis;

private final int threads;

public IndexDataReader( final InputStream is )
throws IOException
throws IOException
{
this( is, 1 );
}

public IndexDataReader( final InputStream is, final int threads )
throws IOException
{
if ( threads < 1 )
{
throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads );
}
this.threads = threads;
// MINDEXER-13
// LightweightHttpWagon may have performed automatic decompression
// Handle it transparently
Expand All @@ -72,8 +102,23 @@ public IndexDataReader( final InputStream is )
}

public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
throws IOException
throws IOException
{
if ( threads == 1 )
{
return readIndexST( w, context );
}
else
{
return readIndexMT( w, context );
}
}

private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context )
throws IOException
{
LOGGER.debug( "Reading ST index..." );
Instant start = Instant.now();
long timestamp = readHeader();

Date date = null;
Expand All @@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
int n = 0;

Document doc;
Set<String> rootGroups = new LinkedHashSet<>();
Set<String> allGroups = new LinkedHashSet<>();
ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();

while ( ( doc = readDocument() ) != null )
{
ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
if ( ai != null )
addToIndex( doc, context, w, rootGroups, allGroups );
n++;
}

w.commit();

IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount( n );
result.setTimestamp( date );
result.setRootGroups( rootGroups.keySet() );
result.setAllGroups( allGroups.keySet() );

LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
return result;
}

private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
throws IOException
{
LOGGER.debug( "Reading MT index..." );
Instant start = Instant.now();
long timestamp = readHeader();

int n = 0;

final Document theEnd = new Document();

ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );

ExecutorService executorService = Executors.newFixedThreadPool( threads );
ArrayList<Exception> errors = new ArrayList<>();
ArrayList<IndexWriter> silos = new ArrayList<>( threads );
for ( int i = 0; i < threads; i++ )
{
final int silo = i;
silos.add( tempWriter( "silo" + i ) );
executorService.execute( () ->
{
w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
try
{
while ( true )
{
try
{
Document doc = queue.take();
if ( doc == theEnd )
{
break;
}
addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
}
catch ( InterruptedException | IOException e )
{
errors.add( e );
break;
}
}
}
finally
{
LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
}
} );
}

rootGroups.add( ai.getRootGroup() );
allGroups.add( ai.getGroupId() );
}
else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
|| doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
try
{
Document doc;
while ( ( doc = readDocument() ) != null )
{
// skip it
queue.put( doc );
n++;
}
else
LOGGER.debug( "Signalling END" );
for ( int i = 0; i < threads; i++ )
{
w.addDocument( doc );
queue.put( theEnd );
}
n++;

LOGGER.debug( "Shutting down threads" );
executorService.shutdown();
executorService.awaitTermination( 5L, TimeUnit.MINUTES );
}
catch ( InterruptedException e )
{
throw new IOException( "Interrupted", e );
}

if ( !errors.isEmpty() )
{
IOException exception = new IOException( "Error during load of index" );
errors.forEach( exception::addSuppressed );
throw exception;
}

LOGGER.debug( "Silos loaded..." );
Date date = null;
if ( timestamp != -1 )
{
date = new Date( timestamp );
IndexUtils.updateTimestamp( w.getDirectory(), date );
}

LOGGER.debug( "Merging silos..." );
for ( IndexWriter silo : silos )
{
IndexUtils.close( silo );
w.addIndexes( silo.getDirectory() );
}

LOGGER.debug( "Merged silos..." );
w.commit();

IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount( n );
result.setTimestamp( date );
result.setRootGroups( rootGroups );
result.setAllGroups( allGroups );
result.setRootGroups( rootGroups.keySet() );
result.setAllGroups( allGroups.keySet() );

LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
return result;
}

private FSDirectory tempDirectory( final String name ) throws IOException
{
return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) );
}

private IndexWriter tempWriter( final String name ) throws IOException
{
IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
config.setUseCompoundFile( false );
return new NexusIndexWriter( tempDirectory( name ), config );
}

private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
final ConcurrentMap<String, Boolean> rootGroups,
final ConcurrentMap<String, Boolean> allGroups )
throws IOException
{
ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
if ( ai != null )
{
indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );

rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
}
else
{
// these two fields are automatically handled in code above
if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
&& doc.getField( ArtifactInfo.ROOT_GROUPS ) == null )
{
indexWriter.addDocument( doc );
}
}
}

public long readHeader()
throws IOException
throws IOException
{
final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );

Expand All @@ -139,7 +315,7 @@ public long readHeader()
}

public Document readDocument()
throws IOException
throws IOException
{
int fieldCount;
try
Expand All @@ -160,7 +336,7 @@ public Document readDocument()

// Fix up UINFO field wrt MINDEXER-41
final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
final String info = doc.get( ArtifactInfo.INFO );
final String info = doc.get( ArtifactInfo.INFO );
if ( uinfoField != null && info != null && !info.isEmpty() )
{
final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
Expand All @@ -179,7 +355,7 @@ public Document readDocument()
}

private Field readField()
throws IOException
throws IOException
{
int flags = dis.read();

Expand All @@ -199,7 +375,7 @@ private Field readField()
}

private static String readUTF( DataInput in )
throws IOException
throws IOException
{
int utflen = in.readInt();

Expand All @@ -214,7 +390,7 @@ private static String readUTF( DataInput in )
catch ( OutOfMemoryError e )
{
throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
+ " See MINDEXER-28 for more information!", e );
+ " See MINDEXER-28 for more information!", e );
}

int c, char2, char3;
Expand Down Expand Up @@ -282,7 +458,7 @@ private static String readUTF( DataInput in )
throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
}
chararr[chararrCount++] =
(char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
(char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
break;

default:
Expand Down Expand Up @@ -360,7 +536,7 @@ public Set<String> getAllGroups()
* @throws IOException in case of an IO exception during index file access
*/
public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
throws IOException
throws IOException
{
dis.readByte(); // data format version

Expand Down
Loading

0 comments on commit 2e2a655

Please sign in to comment.