Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to build star tree in off heap #14817

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void testValidCompositeIndex() {
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());
assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(
StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP,
StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP,
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
starTreeFieldType.getStarTreeConfig().getBuildMode()
);
assertEquals(Collections.emptySet(), starTreeFieldType.getStarTreeConfig().getSkipStarNodeCreationInDims());
Expand Down Expand Up @@ -359,7 +359,7 @@ public void testUpdateIndexWhenMappingIsSame() {
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());
assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(
StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP,
StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP,
starTreeFieldType.getStarTreeConfig().getBuildMode()
);
assertEquals(Collections.emptySet(), starTreeFieldType.getStarTreeConfig().getSkipStarNodeCreationInDims());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
Expand Down Expand Up @@ -50,9 +48,9 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;
private final Set<String> segmentFieldSet;
private final boolean segmentHasCompositeFields;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

Expand All @@ -70,6 +68,8 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
// check if there are any composite fields which are part of the segment
segmentHasCompositeFields = Collections.disjoint(segmentFieldSet, compositeFieldSet) == false;
}

@Override
Expand All @@ -91,7 +91,7 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null) {
if (mergeState.get() == null && segmentHasCompositeFields) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
createCompositeIndicesIfPossible(valuesProducer, field);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public Long toLongValue(Long value) {
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}

@Override
public Long getIdentityMetricValue() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public Double toStarTreeNumericTypeValue(Long value) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}

@Override
public Double getIdentityMetricValue() {
return 0D;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ public interface ValueAggregator<A> {
* Converts an aggregated value from a Long type.
*/
A toStarTreeNumericTypeValue(Long rawValue);

/**
* Fetches a value that does not alter the result of aggregations
*/
A getIdentityMetricValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.builder;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericTypeConverters;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeDocumentBitSetUtil;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/**
* Abstract class for managing star tree file operations.
*
* @opensearch.experimental
*/
@ExperimentalApi
public abstract class AbstractDocumentsFileManager implements Closeable {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger logger = LogManager.getLogger(AbstractDocumentsFileManager.class);
protected final StarTreeField starTreeField;
protected final List<MetricAggregatorInfo> metricAggregatorInfos;
protected final int numMetrics;
protected final TrackingDirectoryWrapper tmpDirectory;
protected final SegmentWriteState state;
protected int docSizeInBytes = -1;

public AbstractDocumentsFileManager(
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
SegmentWriteState state,
StarTreeField starTreeField,
List<MetricAggregatorInfo> metricAggregatorInfos
) {
this.starTreeField = starTreeField;
this.tmpDirectory = new TrackingDirectoryWrapper(state.directory);
this.metricAggregatorInfos = metricAggregatorInfos;
this.state = state;
numMetrics = metricAggregatorInfos.size();
}

private void setDocSizeInBytes(int numBytes) {
if (docSizeInBytes == -1) {
docSizeInBytes = numBytes;
}
assert docSizeInBytes == numBytes;
}

/**
* Write the star tree document to file associated with dimensions and metrics
*/
protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = writeDimensions(starTreeDocument, output);
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
setDocSizeInBytes(numBytes);
return numBytes;
}

/**
* Write dimensions to file
*/
protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
output.writeLong(starTreeDocument.dimensions[i] == null ? 0L : starTreeDocument.dimensions[i]);
numBytes += Long.BYTES;
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, output);
return numBytes;
}

/**
* Write star tree document metrics to file
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) {
case LONG:
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
break;
case DOUBLE:
if (isAggregatedDoc) {
long val = NumericUtils.doubleToSortableLong(
starTreeDocument.metrics[i] == null ? 0.0 : (Double) starTreeDocument.metrics[i]
);
output.writeLong(val);
} else {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
}
numBytes += Long.BYTES;
break;
default:
throw new IllegalStateException("Unsupported metric type");

Check warning on line 109 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L109

Added line #L109 was not covered by tests
}
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
}

/**
* Reads the star tree document from file with given offset
*
* @param input RandomAccessInput
* @param offset Offset in the file
* @param shouldReadAggregatedDocs boolean to indicate if aggregated star tree docs should be read
* @return StarTreeDocument
* @throws IOException IOException in case of I/O errors
*/
protected StarTreeDocument readStarTreeDocument(RandomAccessInput input, long offset, boolean shouldReadAggregatedDocs)
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
int dimSize = starTreeField.getDimensionsOrder().size();
Long[] dimensions = new Long[dimSize];
long initialOffset = offset;
offset = readDimensions(dimensions, input, offset);

Object[] metrics = new Object[numMetrics];
offset = readMetrics(input, offset, numMetrics, metrics, shouldReadAggregatedDocs);
assert (offset - initialOffset) == docSizeInBytes;
return new StarTreeDocument(dimensions, metrics);
}

/**
* Read dimensions from file
*/
protected long readDimensions(Long[] dimensions, RandomAccessInput input, long offset) throws IOException {
for (int i = 0; i < dimensions.length; i++) {
try {
dimensions[i] = input.readLong(offset);
} catch (Exception e) {
logger.error("Error reading dimension value at offset {} for dimension {}", offset, i);
throw e;

Check warning on line 147 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L145-L147

Added lines #L145 - L147 were not covered by tests
}
offset += Long.BYTES;
}
offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, dimensions, index -> null);
return offset;
}

/**
* Read star tree metrics from file
*/
protected long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean shouldReadAggregatedDocs)
throws IOException {
for (int i = 0; i < numMetrics; i++) {
switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) {
case LONG:
metrics[i] = input.readLong(offset);
offset += Long.BYTES;
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
break;
case DOUBLE:
long val = input.readLong(offset);
if (shouldReadAggregatedDocs) {
metrics[i] = StarTreeNumericTypeConverters.sortableLongtoDouble(val);
} else {
metrics[i] = val;
}
offset += Long.BYTES;
break;
default:
throw new IllegalStateException("Unsupported metric type");

Check warning on line 176 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L176

Added line #L176 was not covered by tests
}
}
offset += StarTreeDocumentBitSetUtil.readBitSet(
input,
offset,
metrics,
index -> metricAggregatorInfos.get(index).getValueAggregators().getIdentityMetricValue()
);
return offset;
}

/**
* Write star tree document to file
*/
public abstract void writeStarTreeDocument(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) throws IOException;

/**
* Read star tree document from file based on doc id
*/
public abstract StarTreeDocument readStarTreeDocument(int docId, boolean isMerge) throws IOException;

/**
* Read star document dimensions from file based on doc id
*/
public abstract Long[] readDimensions(int docId) throws IOException;

/**
* Read dimension value for given doc id and dimension id
*/
public abstract Long getDimensionValue(int docId, int dimensionId) throws IOException;

/**
* Delete the temporary files created
*/
public void deleteFiles(boolean success) throws IOException {
if (success) {
for (String file : tmpDirectory.getCreatedFiles()) {
tmpDirectory.deleteFile(file);
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 215 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L214-L215

Added lines #L214 - L215 were not covered by tests
} else {
deleteFilesIgnoringException();

Check warning on line 217 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L217

Added line #L217 was not covered by tests
}

}

Check warning on line 220 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L220

Added line #L220 was not covered by tests

/**
* Delete the temporary files created
*/
private void deleteFilesIgnoringException() throws IOException {
for (String file : tmpDirectory.getCreatedFiles()) {
try {
tmpDirectory.deleteFile(file);
} catch (final IOException ignored) {} // similar to IOUtils.deleteFilesWhileIgnoringExceptions
}
}

Check warning on line 231 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L228-L231

Added lines #L228 - L231 were not covered by tests
}
Loading
Loading