Skip to content

Commit

Permalink
OAK-11006 - indexing-job: better logging of text extraction and index…
Browse files Browse the repository at this point in the history
…ing statistics (apache#1629)
  • Loading branch information
nfsantos authored Aug 12, 2024
1 parent e4db4c6 commit 0d23900
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
import org.apache.jackrabbit.guava.common.cache.RemovalCause;
import org.apache.jackrabbit.guava.common.cache.Weigher;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.io.FileTreeTraverser;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,18 +84,14 @@ public class FileCache extends AbstractCache<String, File> implements Closeable
/**
* Convert the size calculation to KB to support max file size of 2 TB
*/
private static final Weigher<String, File> weigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
// convert to number of 4 KB blocks
return Math.round(value.length() / (4 * 1024));
}};
private static final Weigher<String, File> weigher = (key, value) -> {
// convert to number of 4 KB blocks
return Math.round(value.length() / (4 * 1024));
};

//Rough estimate of the in-memory key, value pair
private static final Weigher<String, File> memWeigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
return (StringUtils.estimateMemoryUsage(key) +
StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
}};
private static final Weigher<String, File> memWeigher = (key, value) -> (StringUtils.estimateMemoryUsage(key) +
StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);

private FileCache(long maxSize /* bytes */, File root,
final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) {
Expand All @@ -108,15 +102,17 @@ private FileCache(long maxSize /* bytes */, File root,
// convert to number of 4 KB blocks
long size = Math.round(maxSize / (1024L * 4));

cacheLoader = new CacheLoader<String, File>() {
@Override public File load(String key) throws Exception {
cacheLoader = new CacheLoader<>() {
@Override
public File load(String key) throws Exception {
// Fetch from local cache directory and if not found load from backend
File cachedFile = DataStoreCacheUtils.getFile(key, cacheRoot);
if (cachedFile.exists()) {
return cachedFile;
} else {
InputStream is = null;
boolean threw = true;
long startNanos = System.nanoTime();
try {
is = loader.load(key);
copyInputStreamToFile(is, cachedFile);
Expand All @@ -127,6 +123,9 @@ private FileCache(long maxSize /* bytes */, File root,
} finally {
Closeables.close(is, threw);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded file: {} in {}", key, (System.nanoTime() - startNanos) / 1_000_000);
}
return cachedFile;
}
}
Expand All @@ -137,21 +136,17 @@ private FileCache(long maxSize /* bytes */, File root,
.recordStats()
.weigher(weigher)
.segmentCount(SEGMENT_COUNT)
.evictionCallback(new EvictionCallback<String, File>() {
@Override
public void evicted(@NotNull String key, @Nullable File cachedFile,
@NotNull RemovalCause cause) {
try {
if (cachedFile != null && cachedFile.exists()
&& cause != RemovalCause.REPLACED) {
DataStoreCacheUtils.recursiveDelete(cachedFile, cacheRoot);
LOG.info("File [{}] evicted with reason [{}]", cachedFile, cause
.toString());
}
} catch (IOException e) {
LOG.info("Cached file deletion failed after eviction", e);
.evictionCallback((key, cachedFile, cause) -> {
try {
if (cachedFile != null && cachedFile.exists()
&& cause != RemovalCause.REPLACED) {
DataStoreCacheUtils.recursiveDelete(cachedFile, cacheRoot);
LOG.info("File [{}] evicted with reason [{}]", cachedFile, cause);
}
}})
} catch (IOException e) {
LOG.info("Cached file deletion failed after eviction", e);
}
})
.build();

this.cacheStats =
Expand All @@ -177,7 +172,7 @@ public static FileCache build(long maxSize /* bytes */, File root,
}
return new FileCache() {

private final Cache<?,?> cache = new CacheLIRS<>(0);
private final Cache<?, ?> cache = new CacheLIRS<>(0);

@Override public void put(String key, File file) {
}
Expand All @@ -190,7 +185,7 @@ public static FileCache build(long maxSize /* bytes */, File root,
return null;
}

@Override public File get(String key) throws IOException {
@Override public File get(String key) {
return null;
}

Expand Down Expand Up @@ -291,7 +286,7 @@ public void close() {
/**
* Called to initialize the in-memory cache from the fs folder
*/
private class CacheBuildJob implements Callable {
private class CacheBuildJob implements Callable<Integer> {
@Override
public Integer call() {
Stopwatch watch = Stopwatch.createStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
import java.util.concurrent.TimeUnit;

public class FormattingUtils {
public static String formatNanosToSeconds(long nanos) {
return formatToSeconds(nanos/1_000_000_000);
}

public static String formatMillisToSeconds(long millis) {
return formatToSeconds(millis/1000);
}

public static String formatToSeconds(Stopwatch stopwatch) {
return formatToSeconds(stopwatch.elapsed(TimeUnit.SECONDS));
}
Expand All @@ -44,4 +52,12 @@ public static String formatToMillis(Stopwatch stopwatch) {
String sign = millis < 0 ? "-" : "";
return String.format("%s%02d:%02d:%02d.%03d", sign, hoursPart, minutesPart, secondsPart, millisPart);
}

public static double safeComputePercentage(long numerator, long denominator) {
return denominator == 0 ? -1 : (double) numerator / denominator * 100;
}

public static double safeComputeAverage(long totalTime, long numberOfEvents) {
return numberOfEvents == 0 ? -1 : ((double)totalTime / numberOfEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,22 @@ private void testFormatToMillis(String expected, long nanos) {
ticker.set(nanos);
assertEquals(expected, FormattingUtils.formatToMillis(sw));
}

@Test
public void testSafeComputePercentage() {
assertEquals(50.0, FormattingUtils.safeComputePercentage(50, 100), 0.001);
assertEquals(0.0, FormattingUtils.safeComputePercentage(0, 100), 0.001);
assertEquals(-1.0, FormattingUtils.safeComputePercentage(50, 0), 0.001);
assertEquals(100.0, FormattingUtils.safeComputePercentage(100, 100), 0.001);
assertEquals(33.333, FormattingUtils.safeComputePercentage(1, 3), 0.001);
}

@Test
public void testSafeComputeAverage() {
assertEquals(50.0, FormattingUtils.safeComputeAverage(100, 2), 0.001);
assertEquals(0.0, FormattingUtils.safeComputeAverage(0, 100), 0.001);
assertEquals(-1.0, FormattingUtils.safeComputeAverage(100, 0), 0.001);
assertEquals(100.0, FormattingUtils.safeComputeAverage(100, 1), 0.001);
assertEquals(33.333, FormattingUtils.safeComputeAverage(100, 3), 0.001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public boolean isEmpty() {
return indexers.isEmpty();
}

@Override
public void onIndexingStarting() {
for (NodeStateIndexer indexer : indexers) {
indexer.onIndexingStarting();
}
}

@Override
public boolean shouldInclude(String path) {
return indexers.stream().anyMatch(indexer -> indexer.shouldInclude(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public void reindex() throws CommitFailedException, IOException {
} else if (flatFileStores.size() == 1) {
FlatFileStore flatFileStore = flatFileStores.get(0);
TopKSlowestPaths slowestTopKElements = new TopKSlowestPaths(TOP_SLOWEST_PATHS_TO_LOG);
indexer.onIndexingStarting();
long entryStart = System.nanoTime();
for (NodeStateEntry entry : flatFileStore) {
reportDocumentRead(entry.getPath(), progressReporter);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.indexer.document;

import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.slf4j.Logger;

public final class IndexerStatisticsTracker {
private static final int SLOW_DOCUMENT_LOG_THRESHOLD = Integer.getInteger("oak.indexer.slowDocumentLogThreshold", 1000);

private final Logger logger;

// Timestamp of when indexing started.
private long startIndexingNanos = 0;
// Time spent indexing entries. Should be almost the same as totalMakeDocumentTimeNanos+totalWriteTimeNanos
private long totalIndexingTimeNanos = 0;
// Time generating the Lucene document.
private long totalMakeDocumentTimeNanos = 0;
// Time writing the Lucene document to disk.
private long totalWriteTimeNanos = 0;
private long nodesIndexed = 0;

// Timestamp of when the current entry started being indexed
private long startEntryNanos = 0;
// Timestamp of when the current entry finished the makeDocument phase.
private long endEntryMakeDocumentNanos = 0;

public IndexerStatisticsTracker(Logger logger) {
this.logger = logger;
}

public void onIndexingStarting() {
this.startIndexingNanos = System.nanoTime();
}

public void onEntryStart() {
startEntryNanos = System.nanoTime();
}

public void onEntryEndMakeDocument() {
endEntryMakeDocumentNanos = System.nanoTime();
}

public void onEntryEnd(String entryPath) {
long endEntryWriteNanos = System.nanoTime();
nodesIndexed++;
long entryIndexingTimeNanos = endEntryWriteNanos - startEntryNanos;
long entryMakeDocumentTimeNanos = endEntryMakeDocumentNanos - startEntryNanos;
long entryWriteTimeNanos = endEntryWriteNanos - endEntryMakeDocumentNanos;
totalIndexingTimeNanos += entryIndexingTimeNanos;
totalMakeDocumentTimeNanos += entryMakeDocumentTimeNanos;
totalWriteTimeNanos += entryWriteTimeNanos;
if (entryIndexingTimeNanos >= (long) SLOW_DOCUMENT_LOG_THRESHOLD * 1_000_000) {
logger.info("Slow document: {}. Times: total={}ms, makeDocument={}ms, writeToIndex={}ms",
entryPath, entryIndexingTimeNanos / 1_000_000, entryMakeDocumentTimeNanos / 1_000_000, entryWriteTimeNanos / 1_000_000);
}
startEntryNanos = 0;
endEntryMakeDocumentNanos = 0;
}

public String formatStats() {
long endTimeNanos = System.nanoTime();
long totalTimeNanos = endTimeNanos - startIndexingNanos;
long avgTimePerDocumentMicros = Math.round(FormattingUtils.safeComputeAverage(totalIndexingTimeNanos / 1000, nodesIndexed));
double percentageIndexing = FormattingUtils.safeComputePercentage(totalIndexingTimeNanos, totalTimeNanos);
double percentageMakingDocument = FormattingUtils.safeComputePercentage(totalMakeDocumentTimeNanos, totalIndexingTimeNanos);
double percentageWritingToIndex = FormattingUtils.safeComputePercentage(totalWriteTimeNanos, totalIndexingTimeNanos);
return String.format("Indexed %d nodes in %s. Avg per node: %d microseconds. indexingTime: %s (%2.1f%% of total time). Breakup of indexing time: makeDocument: %s (%2.1f%%), writeIndex: %s (%2.1f%%)",
nodesIndexed, FormattingUtils.formatNanosToSeconds(totalTimeNanos), avgTimePerDocumentMicros,
FormattingUtils.formatNanosToSeconds(totalIndexingTimeNanos), percentageIndexing,
FormattingUtils.formatNanosToSeconds(totalMakeDocumentTimeNanos), percentageMakingDocument,
FormattingUtils.formatNanosToSeconds(totalWriteTimeNanos), percentageWritingToIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public interface NodeStateIndexer extends Closeable{

default void onIndexingStarting() {}

boolean shouldInclude(String path);

boolean shouldInclude(NodeDocument doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;
Expand All @@ -43,6 +45,7 @@
NodeStateIndexer for Elastic. Indexes entries from a given nodestate.
*/
public class ElasticIndexer implements NodeStateIndexer {
private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexer.class);

private final IndexDefinition definition;
private final FulltextBinaryTextExtractor binaryTextExtractor;
Expand All @@ -51,6 +54,7 @@ public class ElasticIndexer implements NodeStateIndexer {
private final FulltextIndexWriter<ElasticDocument> indexWriter;
private final ElasticIndexEditorProvider elasticIndexEditorProvider;
private final IndexHelper indexHelper;
private final IndexerStatisticsTracker indexerStatisticsTracker = new IndexerStatisticsTracker(LOG);

public ElasticIndexer(IndexDefinition definition, FulltextBinaryTextExtractor binaryTextExtractor,
NodeBuilder definitionBuilder, IndexingProgressReporter progressReporter,
Expand All @@ -64,6 +68,12 @@ public ElasticIndexer(IndexDefinition definition, FulltextBinaryTextExtractor bi
this.indexHelper = indexHelper;
}

@Override
public void onIndexingStarting() {
indexerStatisticsTracker.onIndexingStarting();
binaryTextExtractor.resetStartTime();
}

@Override
public boolean shouldInclude(String path) {
return getFilterResult(path) != PathFilter.Result.EXCLUDE;
Expand All @@ -77,7 +87,7 @@ public boolean shouldInclude(NodeDocument doc) {

public void provisionIndex() {
FulltextIndexEditor editor = (FulltextIndexEditor) elasticIndexEditorProvider.getIndexEditor(
TYPE_ELASTICSEARCH, definitionBuilder, indexHelper.getNodeStore().getRoot(), new ReportingCallback(definition.getIndexPath(),false));
TYPE_ELASTICSEARCH, definitionBuilder, indexHelper.getNodeStore().getRoot(), new ReportingCallback(definition.getIndexPath(), false));
editor.getContext().enableReindexMode();
}

Expand All @@ -92,12 +102,14 @@ public boolean index(NodeStateEntry entry) throws IOException, CommitFailedExcep
if (indexingRule == null) {
return false;
}
indexerStatisticsTracker.onEntryStart();
ElasticDocumentMaker maker = newDocumentMaker(indexingRule, entry.getPath());

ElasticDocument doc = maker.makeDocument(entry.getNodeState());

indexerStatisticsTracker.onEntryEndMakeDocument();
if (doc != null) {
writeToIndex(doc, entry.getPath());
indexerStatisticsTracker.onEntryEnd(entry.getPath());
progressReporter.indexUpdate(definition.getIndexPath());
return true;
}
Expand All @@ -116,6 +128,8 @@ public Set<String> getRelativeIndexedNodeNames() {

@Override
public void close() throws IOException {
LOG.info("Statistics: {}", indexerStatisticsTracker.formatStats());
binaryTextExtractor.logStats();
indexWriter.close(System.currentTimeMillis());
}

Expand Down
Loading

0 comments on commit 0d23900

Please sign in to comment.