Skip to content

Commit

Permalink
Support flattened field with downsampling.
Browse files Browse the repository at this point in the history
If flattened field is configured as non-dimension and non-metric field, then downsampling fails to execute successfully. Downsampling doesn't know how to use the flattened field or how to serialize it. This change addresses this.

Closes elastic#116319
  • Loading branch information
martijnvg committed Dec 17, 2024
1 parent d09d57d commit 42090f9
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.DynamicFieldType;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand Down Expand Up @@ -670,7 +671,7 @@ public static final class RootFlattenedFieldType extends StringFieldType impleme
private final boolean isDimension;
private final int ignoreAbove;

public RootFlattenedFieldType(
RootFlattenedFieldType(
String name,
boolean indexed,
boolean hasDocValues,
Expand All @@ -682,7 +683,7 @@ public RootFlattenedFieldType(
this(name, indexed, hasDocValues, meta, splitQueriesOnWhitespace, eagerGlobalOrdinals, Collections.emptyList(), ignoreAbove);
}

public RootFlattenedFieldType(
RootFlattenedFieldType(
String name,
boolean indexed,
boolean hasDocValues,
Expand Down Expand Up @@ -806,6 +807,10 @@ public MappedFieldType getChildFieldType(String childPath) {
return new KeyedFlattenedFieldType(name(), childPath, this);
}

public MappedFieldType getKeyedFieldType() {
return new KeywordFieldMapper.KeywordFieldType(name() + KEYED_FIELD_SUFFIX);
}

@Override
public boolean isDimension() {
return isDimension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* }`
*
*/
class FlattenedFieldSyntheticWriterHelper {
public class FlattenedFieldSyntheticWriterHelper {

private record Prefix(List<String> prefix) {

Expand Down Expand Up @@ -225,17 +225,17 @@ public boolean equals(Object obj) {
}
}

interface SortedKeyedValues {
public interface SortedKeyedValues {
BytesRef next() throws IOException;
}

private final SortedKeyedValues sortedKeyedValues;

FlattenedFieldSyntheticWriterHelper(final SortedKeyedValues sortedKeyedValues) {
public FlattenedFieldSyntheticWriterHelper(final SortedKeyedValues sortedKeyedValues) {
this.sortedKeyedValues = sortedKeyedValues;
}

void write(final XContentBuilder b) throws IOException {
public void write(final XContentBuilder b) throws IOException {
KeyValue curr = new KeyValue(sortedKeyedValues.next());
KeyValue prev = KeyValue.EMPTY;
final List<String> values = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
---
"A flattened field as label field":
- do:
indices.create:
index: source_index
body:
settings:
number_of_shards: 1
index:
mode: time_series
routing_path: [ metricset, k8s.pod.uid ]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
uid:
type: keyword
time_series_dimension: true
name:
type: keyword
agent:
type: flattened
value:
type: long
time_series_metric: gauge

- do:
bulk:
refresh: true
index: source_index
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.4" }, "value": 10 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.5" }, "value": 20 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.6" }, "value": 12 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.7" }, "value": 15 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.7" }, "value": 9 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.8" }, "value": 16 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.9" }, "value": 25 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.10" }, "value": 17 }}'

- do:
indices.put_settings:
index: source_index
body:
index.blocks.write: true

- do:
indices.downsample:
index: source_index
target_index: target_index
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
search:
index: target_index
body:
sort: [ "_tsid", "@timestamp" ]

- length: { hits.hits: 4 }
- match: { hits.hits.0._source._doc_count: 2 }
- match: { hits.hits.0._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 }
- match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.0._source.k8s.agent: { "id": "second", "version": "2.1.8" } }

- match: { hits.hits.1._source._doc_count: 2 }
- match: { hits.hits.1._source.k8s.pod.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 }
- match: { hits.hits.1._source.@timestamp: 2021-04-28T19:00:00.000Z }
- match: { hits.hits.1._source.k8s.agent: { "id": "second", "version": "2.1.10" } }

- match: { hits.hits.2._source._doc_count: 2 }
- match: { hits.hits.2._source.k8s.pod.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.2._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.2._source.k8s.agent: { "id": "first", "version": "2.0.5" } }

- match: { hits.hits.3._source._doc_count: 2 }
- match: { hits.hits.3._source.k8s.pod.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.3._source.@timestamp: 2021-04-28T20:00:00.000Z }
- match: { hits.hits.3._source.k8s.agent: { "id": "first", "version": "2.0.7" } }

---
"A flattened field as dimension field":
- do:
indices.create:
index: source_index
body:
settings:
number_of_shards: 1
index:
mode: time_series
routing_path: [ metricset, k8s.pod.uid ]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
type: flattened
time_series_dimensions: [name, uid]
agent:
type: object
value:
type: long
time_series_metric: gauge

- do:
bulk:
refresh: true
index: source_index
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.4" }, "value": 10 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.5" }, "value": 20 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.6" }, "value": 12 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}, "agent": { "id": "first", "version": "2.0.7" }, "value": 15 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.7" }, "value": 9 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.8" }, "value": 16 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.9" }, "value": 25 }}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}, "agent": { "id": "second", "version": "2.1.10" }, "value": 17 }}'

- do:
indices.put_settings:
index: source_index
body:
index.blocks.write: true

- do:
indices.downsample:
index: source_index
target_index: target_index
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
search:
index: target_index
body:
sort: [ "_tsid", "@timestamp" ]

- length: { hits.hits: 4 }
- match: { hits.hits.0._source._doc_count: 2 }
- match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.0._source.k8s.pod: { "name": "cat", "uid": "947e4ced-1786-4e53-9e0c-5c447e959507" } }

- match: { hits.hits.1._source._doc_count: 2 }
- match: { hits.hits.1._source.@timestamp: 2021-04-28T20:00:00.000Z }
- match: { hits.hits.1._source.k8s.pod: { "name": "cat", "uid": "947e4ced-1786-4e53-9e0c-5c447e959507" } }

- match: { hits.hits.2._source._doc_count: 2 }
- match: { hits.hits.2._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.2._source.k8s.pod: { "name": "dog", "uid": "df3145b3-0563-4d3b-a0f7-897eb2876ea9" } }

- match: { hits.hits.3._source._doc_count: 2 }
- match: { hits.hits.3._source.@timestamp: 2021-04-28T19:00:00.000Z }
- match: { hits.hits.3._source.k8s.pod: { "name": "dog", "uid": "df3145b3-0563-4d3b-a0f7-897eb2876ea9" } }
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
Expand Down Expand Up @@ -65,6 +66,8 @@ private AbstractDownsampleFieldProducer createFieldProducer() {
// If field is not a metric, we downsample it as a label
if ("histogram".equals(fieldType.typeName())) {
return new LabelFieldProducer.HistogramLastLabelFieldProducer(name());
} else if ("flattened".equals(fieldType.typeName())) {
return new LabelFieldProducer.FlattenedLastValueFieldProducer(name());
}
return new LabelFieldProducer.LabelLastValueFieldProducer(name());
}
Expand All @@ -88,9 +91,15 @@ static List<FieldValueFetcher> create(SearchExecutionContext context, String[] f
fetchers.add(new AggregateMetricFieldValueFetcher(metricSubField, aggMetricFieldType, fieldData));
}
}
} else {
} else {
if (context.fieldExistsInIndex(field)) {
final IndexFieldData<?> fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
final IndexFieldData<?> fieldData;
if (fieldType instanceof FlattenedFieldMapper.RootFlattenedFieldType flattenedFieldType) {
var keyedFieldType = flattenedFieldType.getKeyedFieldType();
fieldData = context.getForField(keyedFieldType, MappedFieldType.FielddataOperation.SEARCH);
} else {
fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
}
final String fieldName = context.isMultiField(field)
? fieldType.name().substring(0, fieldType.name().lastIndexOf('.'))
: fieldType.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

package org.elasticsearch.xpack.downsample;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.mapper.flattened.FlattenedFieldSyntheticWriterHelper;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;

Expand Down Expand Up @@ -141,14 +143,14 @@ public void reset() {
}
}

static class AggregateMetricFieldProducer extends LabelLastValueFieldProducer {
static final class AggregateMetricFieldProducer extends LabelLastValueFieldProducer {

AggregateMetricFieldProducer(String name, Metric metric) {
super(name, new LastValueLabel(metric.name()));
}
}

public static class HistogramLastLabelFieldProducer extends LabelLastValueFieldProducer {
static final class HistogramLastLabelFieldProducer extends LabelLastValueFieldProducer {
HistogramLastLabelFieldProducer(String name) {
super(name);
}
Expand All @@ -167,4 +169,47 @@ public void write(XContentBuilder builder) throws IOException {
}
}
}

static final class FlattenedLastValueFieldProducer extends LabelLastValueFieldProducer {

FlattenedLastValueFieldProducer(String name) {
super(name);
}

@Override
public void collect(FormattedDocValues docValues, int docId) throws IOException {
if (isEmpty() == false) {
return;
}
if (docValues.advanceExact(docId) == false) {
return;
}

int docValuesCount = docValues.docValueCount();
assert docValuesCount > 0;
isEmpty = false;
List<BytesRef> values = new ArrayList<>(docValuesCount);
for (int i = 0; i < docValuesCount; i++) {
values.add(new BytesRef(docValues.nextValue().toString()));
}
label.collect(values);
}

@Override
public void write(XContentBuilder builder) throws IOException {
if (isEmpty() == false) {
builder.startObject(name());
var iterator = ((List<?>) label.get()).iterator();
var helper = new FlattenedFieldSyntheticWriterHelper(() -> {
if (iterator.hasNext()) {
return (BytesRef) iterator.next();
} else {
return null;
}
});
helper.write(builder);
builder.endObject();
}
}
}
}

0 comments on commit 42090f9

Please sign in to comment.