Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINDEXER-151] Speed up Index update from remote #255

Merged
merged 2 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,11 @@ private int unpack( CommandLine cli, Components components )

final List<IndexCreator> indexers = getIndexers( cli, components );


try ( BufferedInputStream is = new BufferedInputStream( new FileInputStream( indexArchive ) ); //
FSDirectory directory = FSDirectory.open( outputFolder.toPath() ) )
{
DefaultIndexUpdater.unpackIndexData( is, directory, (IndexingContext) Proxy.newProxyInstance(
DefaultIndexUpdater.unpackIndexData( is, 4, directory, (IndexingContext) Proxy.newProxyInstance(
getClass().getClassLoader(), new Class[] {IndexingContext.class}, new PartialImplementation()
{
public List<IndexCreator> getIndexCreators()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private Date loadIndexDirectory( final IndexUpdateRequest updateRequest, final R
Set<String> allGroups;
if ( remoteIndexFile.endsWith( ".gz" ) )
{
IndexDataReadResult result = unpackIndexData( is, directory, updateRequest.getIndexingContext() );
IndexDataReadResult result = unpackIndexData( is, updateRequest.getThreads(), directory,
updateRequest.getIndexingContext() );
timestamp = result.getTimestamp();
rootGroups = result.getRootGroups();
allGroups = result.getAllGroups();
Expand Down Expand Up @@ -380,17 +381,20 @@ public Date getTimestamp( final Properties properties, final String key )

/**
* @param is an input stream to unpack index data from
* @param threads thread count to use
* @param d
* @param context
*/
public static IndexDataReadResult unpackIndexData( final InputStream is, final Directory d,
public static IndexDataReadResult unpackIndexData( final InputStream is, final int threads, final Directory d,
final IndexingContext context )
throws IOException
{
NexusIndexWriter w = new NexusIndexWriter( d, new IndexWriterConfig( new NexusAnalyzer() ) );
IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
config.setUseCompoundFile( false );
NexusIndexWriter w = new NexusIndexWriter( d, config );
try
{
IndexDataReader dr = new IndexDataReader( is );
IndexDataReader dr = new IndexDataReader( is, threads );

return dr.readIndex( w, context );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,34 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import org.apache.maven.index.ArtifactInfo;
import org.apache.maven.index.context.IndexUtils;
import org.apache.maven.index.context.IndexingContext;
import org.apache.maven.index.context.NexusAnalyzer;
import org.apache.maven.index.context.NexusIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An index data reader used to parse transfer index format.
Expand All @@ -47,11 +62,26 @@
*/
public class IndexDataReader
{
private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );

private final DataInputStream dis;

private final int threads;

public IndexDataReader( final InputStream is )
throws IOException
throws IOException
{
this( is, 1 );
}

public IndexDataReader( final InputStream is, final int threads )
throws IOException
{
if ( threads < 1 )
{
throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads );
}
this.threads = threads;
// MINDEXER-13
// LightweightHttpWagon may have performed automatic decompression
// Handle it transparently
Expand All @@ -72,8 +102,23 @@ public IndexDataReader( final InputStream is )
}

public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
throws IOException
throws IOException
{
if ( threads == 1 )
{
return readIndexST( w, context );
}
else
{
return readIndexMT( w, context );
}
}

private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context )
throws IOException
{
LOGGER.debug( "Reading ST index..." );
Instant start = Instant.now();
long timestamp = readHeader();

Date date = null;
Expand All @@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
int n = 0;

Document doc;
Set<String> rootGroups = new LinkedHashSet<>();
Set<String> allGroups = new LinkedHashSet<>();
ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();

while ( ( doc = readDocument() ) != null )
{
ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
if ( ai != null )
addToIndex( doc, context, w, rootGroups, allGroups );
n++;
}

w.commit();

IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount( n );
result.setTimestamp( date );
result.setRootGroups( rootGroups.keySet() );
result.setAllGroups( allGroups.keySet() );

LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
return result;
}

private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
throws IOException
{
LOGGER.debug( "Reading MT index..." );
Instant start = Instant.now();
long timestamp = readHeader();

int n = 0;

final Document theEnd = new Document();

ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );

ExecutorService executorService = Executors.newFixedThreadPool( threads );
ArrayList<Exception> errors = new ArrayList<>();
ArrayList<IndexWriter> silos = new ArrayList<>( threads );
for ( int i = 0; i < threads; i++ )
{
final int silo = i;
silos.add( tempWriter( "silo" + i ) );
executorService.execute( () ->
{
w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
try
{
while ( true )
{
try
{
Document doc = queue.take();
if ( doc == theEnd )
{
break;
}
addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
}
catch ( InterruptedException | IOException e )
{
errors.add( e );
break;
}
}
}
finally
{
LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
}
} );
}

rootGroups.add( ai.getRootGroup() );
allGroups.add( ai.getGroupId() );
}
else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
|| doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
try
{
Document doc;
while ( ( doc = readDocument() ) != null )
{
// skip it
queue.put( doc );
n++;
}
else
LOGGER.debug( "Signalling END" );
for ( int i = 0; i < threads; i++ )
{
w.addDocument( doc );
queue.put( theEnd );
}
n++;

LOGGER.debug( "Shutting down threads" );
executorService.shutdown();
executorService.awaitTermination( 5L, TimeUnit.MINUTES );
}
catch ( InterruptedException e )
{
throw new IOException( "Interrupted", e );
}

if ( !errors.isEmpty() )
{
IOException exception = new IOException( "Error during load of index" );
errors.forEach( exception::addSuppressed );
throw exception;
}

LOGGER.debug( "Silos loaded..." );
Date date = null;
if ( timestamp != -1 )
{
date = new Date( timestamp );
IndexUtils.updateTimestamp( w.getDirectory(), date );
}

LOGGER.debug( "Merging silos..." );
for ( IndexWriter silo : silos )
{
IndexUtils.close( silo );
w.addIndexes( silo.getDirectory() );
}

LOGGER.debug( "Merged silos..." );
w.commit();
Comment on lines +254 to 255
Copy link
Member

@mbien mbien Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been experimenting with this in the NetBeans maven support modules and MT extraction (#4999) and it works great!

Could maven-indexer cleanup the temp silo folders after the merge? It is currently leaving almost 6 GB behind after a full update.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea, I forgot about it, sorry. Created https://issues.apache.org/jira/browse/MINDEXER-176


IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount( n );
result.setTimestamp( date );
result.setRootGroups( rootGroups );
result.setAllGroups( allGroups );
result.setRootGroups( rootGroups.keySet() );
result.setAllGroups( allGroups.keySet() );

LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
return result;
}

private FSDirectory tempDirectory( final String name ) throws IOException
{
return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) );
}

private IndexWriter tempWriter( final String name ) throws IOException
{
IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
config.setUseCompoundFile( false );
return new NexusIndexWriter( tempDirectory( name ), config );
}

private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
final ConcurrentMap<String, Boolean> rootGroups,
final ConcurrentMap<String, Boolean> allGroups )
throws IOException
{
ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
if ( ai != null )
{
indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );

rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
}
else
{
// these two fields are automatically handled in code above
if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
&& doc.getField( ArtifactInfo.ROOT_GROUPS ) == null )
{
indexWriter.addDocument( doc );
}
}
}

public long readHeader()
throws IOException
throws IOException
{
final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );

Expand All @@ -139,7 +315,7 @@ public long readHeader()
}

public Document readDocument()
throws IOException
throws IOException
{
int fieldCount;
try
Expand All @@ -160,7 +336,7 @@ public Document readDocument()

// Fix up UINFO field wrt MINDEXER-41
final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
final String info = doc.get( ArtifactInfo.INFO );
final String info = doc.get( ArtifactInfo.INFO );
if ( uinfoField != null && info != null && !info.isEmpty() )
{
final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
Expand All @@ -179,7 +355,7 @@ public Document readDocument()
}

private Field readField()
throws IOException
throws IOException
{
int flags = dis.read();

Expand All @@ -199,7 +375,7 @@ private Field readField()
}

private static String readUTF( DataInput in )
throws IOException
throws IOException
{
int utflen = in.readInt();

Expand All @@ -214,7 +390,7 @@ private static String readUTF( DataInput in )
catch ( OutOfMemoryError e )
{
throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
+ " See MINDEXER-28 for more information!", e );
+ " See MINDEXER-28 for more information!", e );
}

int c, char2, char3;
Expand Down Expand Up @@ -282,7 +458,7 @@ private static String readUTF( DataInput in )
throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
}
chararr[chararrCount++] =
(char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
(char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
break;

default:
Expand Down Expand Up @@ -360,7 +536,7 @@ public Set<String> getAllGroups()
* @throws IOException in case of an IO exception during index file access
*/
public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
throws IOException
throws IOException
{
dis.readByte(); // data format version

Expand Down
Loading