diff --git a/docs/changelog/93255.yaml b/docs/changelog/93255.yaml new file mode 100644 index 0000000000000..1492a5b7155df --- /dev/null +++ b/docs/changelog/93255.yaml @@ -0,0 +1,5 @@ +pr: 93255 +summary: Improve frequent items runtime +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java index dbe29b6481714..a404514aca071 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java @@ -48,7 +48,7 @@ public abstract class ItemSetMapReduceAggregator< ReduceContext extends Closeable, Result extends ToXContent & Writeable> extends AggregatorBase { - private final List extractors; + private final List valueSources; private final Weight weightDocumentFilter; private final List fields; private final AbstractItemSetMapReducer mapReducer; @@ -69,7 +69,7 @@ protected ItemSetMapReduceAggregator( ) throws IOException { super(name, AggregatorFactories.EMPTY, context, parent, CardinalityUpperBound.NONE, metadata); - List extractors = new ArrayList<>(); + List valueSources = new ArrayList<>(); List fields = new ArrayList<>(); IndexSearcher contextSearcher = context.searcher(); @@ -84,11 +84,11 @@ protected ItemSetMapReduceAggregator( .build(c.v1(), id++, c.v2()); if (e.getField().getName() != null) { fields.add(e.getField()); - extractors.add(e); + valueSources.add(e); } } - this.extractors = Collections.unmodifiableList(extractors); + this.valueSources = Collections.unmodifiableList(valueSources); this.fields = Collections.unmodifiableList(fields); this.mapReducer = mapReducer; this.profiling = context.profiling(); @@ -126,14 +126,19 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext ctx, ) : null; + List valueCollectors = new ArrayList<>(valueSources.size()); + for (ItemSetMapReduceValueSource valueSource : valueSources) { + valueCollectors.add(valueSource.getValueCollector(ctx.getLeafReaderContext())); + } + return new LeafBucketCollectorBase(sub, null) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { SetOnce firstException = new SetOnce<>(); if (bits == null || bits.get(doc)) { - mapReducer.map(extractors.stream().map(extractor -> { + mapReducer.map(valueCollectors.stream().map(c -> { try { - return extractor.collect(ctx.getLeafReaderContext(), doc); + return c.collect(doc); } catch (IOException e) { firstException.trySet(e); // ignored in AbstractMapReducer diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java index 6af981f7ed130..e02ca1666b2db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java @@ -34,11 +34,21 @@ */ public abstract class ItemSetMapReduceValueSource { + /** + * Interface to hook value collection into the {@link org.elasticsearch.search.aggregations.support.ValuesSourceRegistry} + */ @FunctionalInterface public interface ValueSourceSupplier { ItemSetMapReduceValueSource build(ValuesSourceConfig config, int id, IncludeExclude includeExclude); } + /** + * Internal interface for collecting values + */ + interface ValueCollector { + Tuple> collect(int doc) throws IOException; + } + enum ValueFormatter { BYTES_REF { @Override @@ -125,7 +135,7 @@ public int hashCode() { private final Field field; - abstract Tuple> collect(LeafReaderContext ctx, int doc) throws IOException; + abstract ValueCollector getValueCollector(LeafReaderContext ctx) throws IOException; ItemSetMapReduceValueSource(ValuesSourceConfig config, int id, ValueFormatter valueFormatter) { String fieldName = config.fieldContext() != null ? config.fieldContext().field() : null; @@ -151,22 +161,31 @@ public KeywordValueSource(ValuesSourceConfig config, int id, IncludeExclude incl } @Override - public Tuple> collect(LeafReaderContext ctx, int doc) throws IOException { - SortedBinaryDocValues values = source.bytesValues(ctx); + ValueCollector getValueCollector(LeafReaderContext ctx) throws IOException { + final SortedBinaryDocValues values = source.bytesValues(ctx); + final Field field = getField(); + final Tuple> empty = new Tuple<>(field, Collections.emptyList()); + + return doc -> { + if (values.advanceExact(doc)) { + int valuesCount = values.docValueCount(); + + if (valuesCount == 0) { + return empty; + } - if (values.advanceExact(doc)) { - int valuesCount = values.docValueCount(); - List objects = new ArrayList<>(valuesCount); + List objects = new ArrayList<>(valuesCount); - for (int i = 0; i < valuesCount; ++i) { - BytesRef v = values.nextValue(); - if (stringFilter == null || stringFilter.accept(v)) { - objects.add(BytesRef.deepCopyOf(v)); + for (int i = 0; i < valuesCount; ++i) { + BytesRef v = values.nextValue(); + if (stringFilter == null || stringFilter.accept(v)) { + objects.add(BytesRef.deepCopyOf(v)); + } } + return new Tuple<>(field, objects); } - return new Tuple<>(getField(), objects); - } - return new Tuple<>(getField(), Collections.emptyList()); + return empty; + }; } } @@ -182,22 +201,31 @@ public NumericValueSource(ValuesSourceConfig config, int id, IncludeExclude incl } @Override - public Tuple> collect(LeafReaderContext ctx, int doc) throws IOException { - SortedNumericDocValues values = source.longValues(ctx); + ValueCollector getValueCollector(LeafReaderContext ctx) throws IOException { + final SortedNumericDocValues values = source.longValues(ctx); + final Field field = getField(); + final Tuple> empty = new Tuple<>(field, Collections.emptyList()); - if (values.advanceExact(doc)) { - int valuesCount = values.docValueCount(); - List objects = new ArrayList<>(valuesCount); + return doc -> { + if (values.advanceExact(doc)) { + int valuesCount = values.docValueCount(); - for (int i = 0; i < valuesCount; ++i) { - long v = values.nextValue(); - if (longFilter == null || longFilter.accept(v)) { - objects.add(v); + if (valuesCount == 0) { + return empty; } + + List objects = new ArrayList<>(valuesCount); + + for (int i = 0; i < valuesCount; ++i) { + long v = values.nextValue(); + if (longFilter == null || longFilter.accept(v)) { + objects.add(v); + } + } + return new Tuple<>(field, objects); } - return new Tuple<>(getField(), objects); - } - return new Tuple<>(getField(), Collections.emptyList()); + return empty; + }; } }