Skip to content

Commit

Permalink
migrations v2: Retry tasks that timeout (#95305) (#96305)
Browse files Browse the repository at this point in the history
* Retry tasks that timeout with timeout_exception or receive_timeout_transport_exception

* Integration test: assert waitForPickupUpdatedMappingsTask waitForReindexTask returns retryable error when task has not completed within the timeout

* stateActionMachine: remove infinite loop failsafe

* Introduce wait_for_task_completion_timeout and keep on retrying *_WAIT_FOR_TASK steps until the ES task completes

* cloneIndex integration test if clone target exists but does not turn yellow within timeout

* Try to stabilize waitForReindexTask test

* Fix types

* Make v2 migrations retryAttempts configurable

* Improve type safety by narrowing left res types

* Fix test description

* Fix tests

Co-authored-by: Rudolf Meijering <skaapgif@gmail.com>
  • Loading branch information
mshustov and rudolf committed Apr 6, 2021
1 parent 031f893 commit d8efcf3
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ const createMigrator = (
) => {
const mockMigrator: jest.Mocked<IKibanaMigrator> = {
kibanaVersion: '8.0.0-testing',
savedObjectsConfig: {
soMigrationsConfig: {
batchSize: 100,
scrollDuration: '15m',
pollInterval: 1500,
skip: false,
// TODO migrationsV2: remove/deprecate once we release migrations v2
// TODO migrationsV2: remove/deprecate once we remove migrations v1
enableV2: false,
retryAttempts: 10,
},
runMigrations: jest.fn(),
getActiveMappings: jest.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,13 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false })
enabled: true,
index: '.my-index',
} as KibanaMigratorOptions['kibanaConfig'],
savedObjectsConfig: {
soMigrationsConfig: {
batchSize: 20,
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
enableV2,
retryAttempts: 20,
},
client: elasticsearchClientMock.createElasticsearchClient(),
};
Expand Down
23 changes: 12 additions & 11 deletions src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { MigrationLogger } from '../core/migration_logger';
export interface KibanaMigratorOptions {
client: ElasticsearchClient;
typeRegistry: ISavedObjectTypeRegistry;
savedObjectsConfig: SavedObjectsMigrationConfigType;
soMigrationsConfig: SavedObjectsMigrationConfigType;
kibanaConfig: KibanaConfigType;
kibanaVersion: string;
logger: Logger;
Expand Down Expand Up @@ -72,10 +72,10 @@ export class KibanaMigrator {
});
private readonly activeMappings: IndexMapping;
private migrationsRetryDelay?: number;
// TODO migrationsV2: make private once we release migrations v2
public kibanaVersion: string;
// TODO migrationsV2: make private once we release migrations v2
public readonly savedObjectsConfig: SavedObjectsMigrationConfigType;
// TODO migrationsV2: make private once we remove migrations v1
public readonly kibanaVersion: string;
// TODO migrationsV2: make private once we remove migrations v1
public readonly soMigrationsConfig: SavedObjectsMigrationConfigType;

/**
* Creates an instance of KibanaMigrator.
Expand All @@ -84,14 +84,14 @@ export class KibanaMigrator {
client,
typeRegistry,
kibanaConfig,
savedObjectsConfig,
soMigrationsConfig,
kibanaVersion,
logger,
migrationsRetryDelay,
}: KibanaMigratorOptions) {
this.client = client;
this.kibanaConfig = kibanaConfig;
this.savedObjectsConfig = savedObjectsConfig;
this.soMigrationsConfig = soMigrationsConfig;
this.typeRegistry = typeRegistry;
this.serializer = new SavedObjectsSerializer(this.typeRegistry);
this.mappingProperties = mergeTypes(this.typeRegistry.getAllTypes());
Expand Down Expand Up @@ -175,7 +175,7 @@ export class KibanaMigrator {

const migrators = Object.keys(indexMap).map((index) => {
// TODO migrationsV2: remove old migrations algorithm
if (this.savedObjectsConfig.enableV2) {
if (this.soMigrationsConfig.enableV2) {
return {
migrate: (): Promise<MigrationResult> => {
return runResilientMigrator({
Expand All @@ -193,20 +193,21 @@ export class KibanaMigrator {
),
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,
});
},
};
} else {
return new IndexMigrator({
batchSize: this.savedObjectsConfig.batchSize,
batchSize: this.soMigrationsConfig.batchSize,
client: createMigrationEsClient(this.client, this.log, this.migrationsRetryDelay),
documentMigrator: this.documentMigrator,
index,
kibanaVersion: this.kibanaVersion,
log: this.log,
mappingProperties: indexMap[index].typeMappings,
pollInterval: this.savedObjectsConfig.pollInterval,
scrollDuration: this.savedObjectsConfig.scrollDuration,
pollInterval: this.soMigrationsConfig.pollInterval,
scrollDuration: this.soMigrationsConfig.scrollDuration,
serializer: this.serializer,
// Only necessary for the migrator of the kibana index.
obsoleteIndexTemplatePattern:
Expand Down
142 changes: 108 additions & 34 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { ElasticsearchClientError } from '@elastic/elasticsearch/lib/errors';
import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { flow } from 'fp-ts/lib/function';
Expand All @@ -23,12 +23,6 @@ import {
} from './catch_retryable_es_client_errors';
export type { RetryableEsClientError };

export const isRetryableEsClientResponse = (
res: Either.Either<any, unknown>
): res is Either.Left<RetryableEsClientError> => {
return Either.isLeft(res) && res.left.type === 'retryable_es_client_error';
};

/**
* Batch size for updateByQuery, reindex & search operations. Smaller batches
* reduce the memory pressure on Elasticsearch and Kibana so are less likely
Expand All @@ -45,6 +39,27 @@ const INDEX_NUMBER_OF_SHARDS = 1;
/** Wait for all shards to be active before starting an operation */
const WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE = 'all';

// Map of left response 'type' string -> response interface
export interface ActionErrorTypeMap {
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
retryable_es_client_error: RetryableEsClientError;
index_not_found_exception: IndexNotFound;
target_index_had_write_block: TargetIndexHadWriteBlock;
incompatible_mapping_exception: IncompatibleMappingException;
alias_not_found_exception: AliasNotFound;
remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex;
}

/**
* Type guard for narrowing the type of a left
*/
export function isLeftTypeof<T extends keyof ActionErrorTypeMap>(
res: any,
typeString: T
): res is ActionErrorTypeMap[T] {
return res.type === typeString;
}

export type FetchIndexResponse = Record<
string,
{ aliases: Record<string, unknown>; mappings: IndexMapping; settings: unknown }
Expand Down Expand Up @@ -74,6 +89,10 @@ export const fetchIndices = (
.catch(catchRetryableEsClientErrors);
};

export interface IndexNotFound {
type: 'index_not_found_exception';
index: string;
}
/**
* Sets a write block in place for the given index. If the response includes
* `acknowledged: true` all in-progress writes have drained and no further
Expand All @@ -87,7 +106,7 @@ export const setWriteBlock = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<
{ type: 'index_not_found_exception' } | RetryableEsClientError,
IndexNotFound | RetryableEsClientError,
'set_write_block_succeeded'
> => () => {
return client.indices
Expand All @@ -112,7 +131,7 @@ export const setWriteBlock = (
.catch((e: ElasticsearchClientError) => {
if (e instanceof EsErrors.ResponseError) {
if (e.message === 'index_not_found_exception') {
return Either.left({ type: 'index_not_found_exception' as const });
return Either.left({ type: 'index_not_found_exception' as const, index });
}
}
throw e;
Expand Down Expand Up @@ -170,10 +189,11 @@ export const removeWriteBlock = (
*/
const waitForIndexStatusYellow = (
client: ElasticsearchClient,
index: string
index: string,
timeout: string
): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
return client.cluster
.health({ index, wait_for_status: 'yellow', timeout: '30s' })
.health({ index, wait_for_status: 'yellow', timeout })
.then(() => {
return Either.right({});
})
Expand All @@ -189,19 +209,18 @@ export type CloneIndexResponse = AcknowledgeResponse;
* This method adds some additional logic to the ES clone index API:
* - it is idempotent, if it gets called multiple times subsequent calls will
* wait for the first clone operation to complete (up to 60s)
* - the first call will wait up to 90s for the cluster state and all shards
* - the first call will wait up to 120s for the cluster state and all shards
* to be updated.
*/
export const cloneIndex = (
client: ElasticsearchClient,
source: string,
target: string
): TaskEither.TaskEither<
RetryableEsClientError | { type: 'index_not_found_exception'; index: string },
CloneIndexResponse
> => {
target: string,
/** only used for testing */
timeout = DEFAULT_TIMEOUT
): TaskEither.TaskEither<RetryableEsClientError | IndexNotFound, CloneIndexResponse> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | { type: 'index_not_found_exception'; index: string },
RetryableEsClientError | IndexNotFound,
AcknowledgeResponse
> = () => {
return client.indices
Expand All @@ -227,7 +246,7 @@ export const cloneIndex = (
},
},
},
timeout: DEFAULT_TIMEOUT,
timeout,
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
)
Expand Down Expand Up @@ -277,7 +296,7 @@ export const cloneIndex = (
} else {
// Otherwise, wait until the target index has a 'green' status.
return pipe(
waitForIndexStatusYellow(client, target),
waitForIndexStatusYellow(client, target, timeout),
TaskEither.map((value) => {
/** When the index status is 'green' we know that all shards were started */
return { acknowledged: true, shardsAcknowledged: true };
Expand All @@ -295,6 +314,38 @@ interface WaitForTaskResponse {
description?: string;
}

/**
* After waiting for the specificed timeout, the task has not yet completed.
*
* When querying the tasks API we use `wait_for_completion=true` to block the
* request until the task completes. If after the `timeout`, the task still has
* not completed we return this error. This does not mean that the task itelf
* has reached a timeout, Elasticsearch will continue to run the task.
*/
export interface WaitForTaskCompletionTimeout {
/** After waiting for the specificed timeout, the task has not yet completed. */
readonly type: 'wait_for_task_completion_timeout';
readonly message: string;
readonly error?: Error;
}

const catchWaitForTaskCompletionTimeout = (
e: ResponseError
): Either.Either<WaitForTaskCompletionTimeout, never> => {
if (
e.body?.error?.type === 'timeout_exception' ||
e.body?.error?.type === 'receive_timeout_transport_exception'
) {
return Either.left({
type: 'wait_for_task_completion_timeout' as const,
message: `[${e.body.error.type}] ${e.body.error.reason}`,
error: e,
});
} else {
throw e;
}
};

/**
* Blocks for up to 60s or until a task completes.
*
Expand All @@ -304,7 +355,10 @@ const waitForTask = (
client: ElasticsearchClient,
taskId: string,
timeout: string
): TaskEither.TaskEither<RetryableEsClientError, WaitForTaskResponse> => () => {
): TaskEither.TaskEither<
RetryableEsClientError | WaitForTaskCompletionTimeout,
WaitForTaskResponse
> => () => {
return client.tasks
.get({
task_id: taskId,
Expand All @@ -322,6 +376,7 @@ const waitForTask = (
description: body.task.description,
});
})
.catch(catchWaitForTaskCompletionTimeout)
.catch(catchRetryableEsClientErrors);
};

Expand Down Expand Up @@ -424,7 +479,15 @@ export const reindex = (
};

interface WaitForReindexTaskFailure {
cause: { type: string; reason: string };
readonly cause: { type: string; reason: string };
}

export interface TargetIndexHadWriteBlock {
type: 'target_index_had_write_block';
}

export interface IncompatibleMappingException {
type: 'incompatible_mapping_exception';
}

export const waitForReindexTask = flow(
Expand All @@ -433,10 +496,11 @@ export const waitForReindexTask = flow(
(
res
): TaskEither.TaskEither<
| { type: 'index_not_found_exception'; index: string }
| { type: 'target_index_had_write_block' }
| { type: 'incompatible_mapping_exception' }
| RetryableEsClientError,
| IndexNotFound
| TargetIndexHadWriteBlock
| IncompatibleMappingException
| RetryableEsClientError
| WaitForTaskCompletionTimeout,
'reindex_succeeded'
> => {
const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) =>
Expand Down Expand Up @@ -507,7 +571,12 @@ export const verifyReindex = (
export const waitForPickupUpdatedMappingsTask = flow(
waitForTask,
TaskEither.chain(
(res): TaskEither.TaskEither<RetryableEsClientError, 'pickup_updated_mappings_succeeded'> => {
(
res
): TaskEither.TaskEither<
RetryableEsClientError | WaitForTaskCompletionTimeout,
'pickup_updated_mappings_succeeded'
> => {
// We don't catch or type failures/errors because they should never
// occur in our migration algorithm and we don't have any business logic
// for dealing with it. If something happens we'll just crash and try
Expand All @@ -529,6 +598,14 @@ export const waitForPickupUpdatedMappingsTask = flow(
)
);

export interface AliasNotFound {
type: 'alias_not_found_exception';
}

export interface RemoveIndexNotAConcreteIndex {
type: 'remove_index_not_a_concrete_index';
}

export type AliasAction =
| { remove_index: { index: string } }
| { remove: { index: string; alias: string; must_exist: boolean } }
Expand All @@ -541,10 +618,7 @@ export const updateAliases = (
client: ElasticsearchClient,
aliasActions: AliasAction[]
): TaskEither.TaskEither<
| { type: 'index_not_found_exception'; index: string }
| { type: 'alias_not_found_exception' }
| { type: 'remove_index_not_a_concrete_index' }
| RetryableEsClientError,
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
> => () => {
return client.indices
Expand Down Expand Up @@ -698,11 +772,11 @@ export const createIndex = (
// If the cluster state was updated and all shards ackd we're done
return TaskEither.right('create_index_succeeded');
} else {
// Otherwise, wait until the target index has a 'green' status.
// Otherwise, wait until the target index has a 'yellow' status.
return pipe(
waitForIndexStatusYellow(client, indexName),
waitForIndexStatusYellow(client, indexName, DEFAULT_TIMEOUT),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
/** When the index status is 'yellow' we know that all shards were started */
return 'create_index_succeeded';
})
);
Expand Down
Loading

0 comments on commit d8efcf3

Please sign in to comment.