Skip to content

Commit

Permalink
Merge branch 'main' into esql-date-nanos-date-format
Browse files Browse the repository at this point in the history
  • Loading branch information
not-napoleon authored Jan 14, 2025
2 parents 956737a + f9b9007 commit 29e5833
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 49 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120084.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120084
summary: Improve how reindex data stream index action handles api blocks
area: Data streams
type: enhancement
issues: []
18 changes: 0 additions & 18 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,26 +227,8 @@ tests:
- class: org.elasticsearch.xpack.inference.InferenceCrudIT
method: testGetServicesWithCompletionTaskType
issue: https://github.com/elastic/elasticsearch/issues/119959
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 8.18.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119978
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119979
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 8.18.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119550
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119980
- class: org.elasticsearch.multi_cluster.MultiClusterYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/119983
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 9.0.0]}
issue: https://github.com/elastic/elasticsearch/issues/119989
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 9.0.0]}
issue: https://github.com/elastic/elasticsearch/issues/119990
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_unattended/Test unattended put and start}
issue: https://github.com/elastic/elasticsearch/issues/120019
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -224,16 +225,46 @@ public void testMappingsAddedToDestIndex() throws Exception {
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testReadOnlyAddedBack() {
public void testFailIfMetadataBlockSet() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

try {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet();
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
}

cleanupMetadataBlocks(sourceIndex);
}

public void testFailIfReadBlockSet() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

try {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet();
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
}

cleanupMetadataBlocks(sourceIndex);
}

public void testReadOnlyBlocksNotAddedBack() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

// Create source index with read-only and/or block-writes
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
boolean isReadOnly = randomBoolean();
boolean isBlockWrites = randomBoolean();
var settings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY, isReadOnly)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, isBlockWrites)
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, randomBoolean())
.put(IndexMetadata.SETTING_BLOCKS_WRITE, randomBoolean())
.build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet();

Expand All @@ -242,13 +273,13 @@ public void testReadOnlyAddedBack() {
.actionGet()
.getDestIndex();

// assert read-only settings added back to dest index
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(isReadOnly, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY)));
assertEquals(isBlockWrites, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
assertFalse(Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));

removeReadOnly(sourceIndex);
removeReadOnly(destIndex);
cleanupMetadataBlocks(sourceIndex);
cleanupMetadataBlocks(destIndex);
}

public void testUpdateSettingsDefaultsRestored() {
Expand Down Expand Up @@ -428,10 +459,11 @@ public void testTsdbStartEndSet() throws Exception {
// TODO check other IndexMetadata fields that need to be fixed after the fact
// TODO what happens if don't have necessary perms for a given index?

private static void removeReadOnly(String index) {
private static void cleanupMetadataBlocks(String index) {
var settings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY, false)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, false)
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
.build();
assertAcked(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)).actionGet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
Expand Down Expand Up @@ -93,13 +92,22 @@ protected void doExecute(
);
}

if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}

SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l, taskId))
.<AddIndexBlockResponse>andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l, taskId))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}
Expand All @@ -120,7 +128,8 @@ public void onResponse(AddIndexBlockResponse response) {
@Override
public void onFailure(Exception e) {
if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) {
// It's fine if block-writes is already set
// Could fail with a cluster block exception if read-only or read-only-allow-delete is already set
// In this case, we can proceed
listener.onResponse(null);
} else {
listener.onFailure(e);
Expand All @@ -146,10 +155,12 @@ private void createIndex(
) {
logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName());

// override read-only settings if they exist
var removeReadOnlyOverride = Settings.builder()
// remove read-only settings if they exist
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_WRITE)
// settings to optimize reindex
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build();
Expand Down Expand Up @@ -192,22 +203,29 @@ private void addBlockIfFromSource(
}
}

private void updateSettings(
String index,
Settings.Builder settings,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), index);
updateSettingsRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", index);
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
}

private void copyOldSourceSettingsToDest(
Settings settingsBefore,
String destIndexName,
ActionListener<AcknowledgedResponse> listener,
TaskId parentTaskId
) {
logger.debug("Updating settings on destination index after reindex completes");

var settings = Settings.builder();
copySettingOrUnset(settingsBefore, settings, IndexMetadata.SETTING_NUMBER_OF_REPLICAS);
copySettingOrUnset(settingsBefore, settings, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey());

var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), destIndexName);
updateSettingsRequest.setParentTask(parentTaskId);
var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName);
client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage));
updateSettings(destIndexName, settings, listener, parentTaskId);
}

private static void copySettingOrUnset(Settings settingsBefore, Settings.Builder builder, String setting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
Expand Down Expand Up @@ -142,9 +141,4 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(stackTraces, stackFrames, executables, stackTraceEvents, totalFrames, samplingRate);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

0 comments on commit 29e5833

Please sign in to comment.