Skip to content

Commit

Permalink
Keep track of logsdb at data stream level. (elastic#113451)
Browse files Browse the repository at this point in the history
Today we keep track of a data stream's index mode. However if index mode
is logsdb, then we don't set it at data stream level. We only set it is
index mode is time series.

This change keeps track of logsdb index mode at data stream creation and
rollover time.

Followup changes: * Update the get data stream api to return that a data
stream is in logsdb mode. (today we do return additional information in
case of time series data stream) * Update the
IndexSettingProvider#getAdditionalIndexSettings(...) parameter list by
including `indexMode` instead of `isTimeSeries`.

Relates to elastic#113583
  • Loading branch information
martijnvg authored Sep 27, 2024
1 parent cec9f98 commit 26d5428
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,23 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
);
assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
DATA_STREAM_NAME,
document(
Instant.now(),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("standard", 1, DATA_STREAM_NAME);

putTemplate(client, "custom-template", TIME_SERIES_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
Expand All @@ -348,7 +365,24 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("time_series", 1, DATA_STREAM_NAME);
assertDataStreamBackingIndexMode("time_series", 2, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
DATA_STREAM_NAME,
document(
Instant.now(),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("standard", 3, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
Expand All @@ -365,7 +399,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME);
assertDataStreamBackingIndexMode("logsdb", 4, DATA_STREAM_NAME);
}

public void testLogsDBToStandardReindex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ yield new DataStreamAutoShardingEvent(
dataStream.rollover(
indexMetadata.getIndex(),
newGeneration,
metadata.isTimeSeriesTemplate(templateV2),
metadata.retrieveIndexModeFromTemplate(templateV2),
dataStreamAutoShardingEvent
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,43 +448,52 @@ public DataStreamIndices getDataStreamIndices(boolean failureStore) {
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
*
* @param writeIndex new write index
* @param generation new generation
* @param timeSeries whether the template that created this data stream is in time series mode
* @param autoShardingEvent the auto sharding event this rollover operation is applying
*
* @param writeIndex new write index
* @param generation new generation
* @param indexModeFromTemplate the index mode that originates from the template that created this data stream
* @param autoShardingEvent the auto sharding event this rollover operation is applying
* @return new {@code DataStream} instance with the rollover operation applied
*/
public DataStream rollover(
Index writeIndex,
long generation,
boolean timeSeries,
IndexMode indexModeFromTemplate,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
) {
ensureNotReplicated();

return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent);
return unsafeRollover(writeIndex, generation, indexModeFromTemplate, autoShardingEvent);
}

/**
* Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only.
* Like {@link #rollover(Index, long, IndexMode, DataStreamAutoShardingEvent)}, but does no validation, use with care only.
*/
public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) {
IndexMode indexMode = this.indexMode;
if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) {
public DataStream unsafeRollover(
Index writeIndex,
long generation,
IndexMode indexModeFromTemplate,
DataStreamAutoShardingEvent autoShardingEvent
) {
IndexMode dsIndexMode = this.indexMode;
if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.TIME_SERIES) {
// This allows for migrating a data stream to be a tsdb data stream:
// (only if index_mode=null|standard then allow it to be set to time_series)
indexMode = IndexMode.TIME_SERIES;
} else if (indexMode == IndexMode.TIME_SERIES && timeSeries == false) {
dsIndexMode = IndexMode.TIME_SERIES;
} else if (dsIndexMode == IndexMode.TIME_SERIES && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) {
// Allow downgrading a time series data stream to a regular data stream
dsIndexMode = null;
} else if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.LOGSDB) {
dsIndexMode = IndexMode.LOGSDB;
} else if (dsIndexMode == IndexMode.LOGSDB && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) {
// Allow downgrading a time series data stream to a regular data stream
indexMode = null;
dsIndexMode = null;
}

List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
backingIndices.add(writeIndex);
return copy().setBackingIndices(
this.backingIndices.copy().setIndices(backingIndices).setAutoShardingEvent(autoShardingEvent).setRolloverOnWrite(false).build()
).setGeneration(generation).setIndexMode(indexMode).build();
).setGeneration(generation).setIndexMode(dsIndexMode).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,16 +1305,10 @@ public Map<String, ComposableIndexTemplate> templatesV2() {
.orElse(Collections.emptyMap());
}

// TODO: remove this method:
public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) {
if (indexTemplate.getDataStreamTemplate() == null) {
return false;
}

var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates());
// Not using IndexSettings.MODE.get() to avoid validation that may fail at this point.
var rawIndexMode = settings.get(IndexSettings.MODE.getKey());
var indexMode = rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null;
if (indexMode == IndexMode.TIME_SERIES) {
var indexModeFromTemplate = retrieveIndexModeFromTemplate(indexTemplate);
if (indexModeFromTemplate == IndexMode.TIME_SERIES) {
// No need to check for the existence of index.routing_path here, because index.mode=time_series can't be specified without it.
// Setting validation takes care of this.
// Also no need to validate that the fields defined in index.routing_path are keyword fields with time_series_dimension
Expand All @@ -1328,6 +1322,17 @@ public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) {
return false;
}

public IndexMode retrieveIndexModeFromTemplate(ComposableIndexTemplate indexTemplate) {
if (indexTemplate.getDataStreamTemplate() == null) {
return null;
}

var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates());
// Not using IndexSettings.MODE.get() to avoid validation that may fail at this point.
var rawIndexMode = settings.get(IndexSettings.MODE.getKey());
return rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null;
}

public Map<String, DataStream> dataStreams() {
return this.custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY).dataStreams();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ static ClusterState createDataStream(
.collect(Collectors.toCollection(ArrayList::new));
dsBackingIndices.add(writeIndex.getIndex());
boolean hidden = isSystem || template.getDataStreamTemplate().isHidden();
final IndexMode indexMode = metadata.isTimeSeriesTemplate(template) ? IndexMode.TIME_SERIES : null;
final IndexMode indexMode = metadata.retrieveIndexModeFromTemplate(template);
final DataStreamLifecycle lifecycle = isSystem
? MetadataIndexTemplateService.resolveLifecycle(template, systemDataStreamDescriptor.getComponentTemplates())
: MetadataIndexTemplateService.resolveLifecycle(template, metadata.componentTemplates());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected DataStream mutateInstance(DataStream instance) {
public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -225,7 +225,7 @@ public void testRolloverWithConflictingBackingIndexName() {
}

final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build(), ds.getBackingIndices());
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -242,7 +242,12 @@ public void testRolloverUpgradeToTsdbDataStream() {
.build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null);
var rolledDs = ds.rollover(
new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()),
newCoordinates.v2(),
IndexMode.TIME_SERIES,
null
);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -251,11 +256,41 @@ public void testRolloverUpgradeToTsdbDataStream() {
assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
}

public void testRolloverDowngradeToRegularDataStream() {
public void testRolloverUpgradeToLogsdbDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance()
.copy()
.setReplicated(false)
.setIndexMode(randomBoolean() ? IndexMode.STANDARD : null)
.build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), IndexMode.LOGSDB, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.LOGSDB));
}

public void testRolloverDowngradeFromTsdbToRegularDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.TIME_SERIES).build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
assertThat(rolledDs.getIndexMode(), nullValue());
}

public void testRolloverDowngradeFromLogsdbToRegularDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.LOGSDB).build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.ExecutorNames;
Expand Down Expand Up @@ -77,6 +78,43 @@ public void testCreateDataStream() throws Exception {
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), nullValue());
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(
newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
equalTo("true")
);
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateDataStreamLogsdb() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(new Template(Settings.builder().put("index.mode", "logsdb").build(), null, null))
.dataStreamTemplate(new DataStreamTemplate())
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName);
ClusterState newState = MetadataCreateDataStreamService.createDataStream(
metadataCreateIndexService,
Settings.EMPTY,
cs,
true,
req,
ActionListener.noop(),
false
);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(
newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ private SingleForecast forecast(Metadata metadata, DataStream stream, long forec
stream = stream.unsafeRollover(
new Index(rolledDataStreamInfo.v1(), uuid),
rolledDataStreamInfo.v2(),
false,
null,
stream.getAutoShardingEvent()
);

Expand Down

0 comments on commit 26d5428

Please sign in to comment.