Skip to content

Commit

Permalink
Improve type safety by narrowing left res types
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolf committed Mar 31, 2021
1 parent f924cf1 commit a7e7fd6
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 68 deletions.
111 changes: 72 additions & 39 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { ElasticsearchClientError } from '@elastic/elasticsearch/lib/errors';
import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { flow } from 'fp-ts/lib/function';
Expand All @@ -23,12 +23,6 @@ import {
} from './catch_retryable_es_client_errors';
export type { RetryableEsClientError };

export const isRetryableEsClientResponse = (
res: Either.Either<any, unknown>
): res is Either.Left<RetryableEsClientError> => {
return Either.isLeft(res) && res.left.type === 'retryable_es_client_error';
};

/**
* Batch size for updateByQuery, reindex & search operations. Smaller batches
* reduce the memory pressure on Elasticsearch and Kibana so are less likely
Expand All @@ -45,6 +39,27 @@ const INDEX_NUMBER_OF_SHARDS = 1;
/** Wait for all shards to be active before starting an operation */
const WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE = 'all';

// Map of left response 'type' string -> response interface
export interface ActionErrorTypeMap {
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
retryable_es_client_error: RetryableEsClientError;
index_not_found_exception: IndexNotFound;
target_index_had_write_block: TargetIndexHadWriteBlock;
incompatible_mapping_exception: IncompatibleMappingException;
alias_not_found_exception: AliasNotFound;
remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex;
}

/**
* Type guard for narrowing the type of a left
*/
export function isLeftTypeof<T extends keyof ActionErrorTypeMap>(
res: any,
typeString: T
): res is ActionErrorTypeMap[T] {
return res.type === typeString;
}

export type FetchIndexResponse = Record<
string,
{ aliases: Record<string, unknown>; mappings: IndexMapping; settings: unknown }
Expand Down Expand Up @@ -74,6 +89,10 @@ export const fetchIndices = (
.catch(catchRetryableEsClientErrors);
};

export interface IndexNotFound {
type: 'index_not_found_exception';
index: string;
}
/**
* Sets a write block in place for the given index. If the response includes
* `acknowledged: true` all in-progress writes have drained and no further
Expand All @@ -87,7 +106,7 @@ export const setWriteBlock = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<
{ type: 'index_not_found_exception' } | RetryableEsClientError,
IndexNotFound | RetryableEsClientError,
'set_write_block_succeeded'
> => () => {
return client.indices
Expand All @@ -112,7 +131,7 @@ export const setWriteBlock = (
.catch((e: ElasticsearchClientError) => {
if (e instanceof EsErrors.ResponseError) {
if (e.message === 'index_not_found_exception') {
return Either.left({ type: 'index_not_found_exception' as const });
return Either.left({ type: 'index_not_found_exception' as const, index });
}
}
throw e;
Expand Down Expand Up @@ -199,12 +218,9 @@ export const cloneIndex = (
target: string,
/** only used for testing */
timeout = DEFAULT_TIMEOUT
): TaskEither.TaskEither<
RetryableEsClientError | { type: 'index_not_found_exception'; index: string },
CloneIndexResponse
> => {
): TaskEither.TaskEither<RetryableEsClientError | IndexNotFound, CloneIndexResponse> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | { type: 'index_not_found_exception'; index: string },
RetryableEsClientError | IndexNotFound,
AcknowledgeResponse
> = () => {
return client.indices
Expand Down Expand Up @@ -306,13 +322,30 @@ interface WaitForTaskResponse {
* not completed we return this error. This does not mean that the task itelf
* has reached a timeout, Elasticsearch will continue to run the task.
*/
export interface WaitForTaskCompletionTimeoutError {
export interface WaitForTaskCompletionTimeout {
/** After waiting for the specificed timeout, the task has not yet completed. */
readonly type: 'wait_for_task_completion_timeout';
readonly message: string;
readonly error?: Error;
}

const catchWaitForTaskCompletionTimeout = (
e: ResponseError
): Either.Either<WaitForTaskCompletionTimeout, never> => {
if (
e.body?.error?.type === 'timeout_exception' ||
e.body?.error?.type === 'receive_timeout_transport_exception'
) {
return Either.left({
type: 'wait_for_task_completion_timeout' as const,
message: `[${e.body.error.type}] ${e.body.error.reason}`,
error: e,
});
} else {
throw e;
}
};

/**
* Blocks for up to 60s or until a task completes.
*
Expand All @@ -323,7 +356,7 @@ const waitForTask = (
taskId: string,
timeout: string
): TaskEither.TaskEither<
RetryableEsClientError | WaitForTaskCompletionTimeoutError,
RetryableEsClientError | WaitForTaskCompletionTimeout,
WaitForTaskResponse
> => () => {
return client.tasks
Expand All @@ -343,20 +376,7 @@ const waitForTask = (
description: body.task.description,
});
})
.catch((e) => {
if (
e.body?.error?.type === 'timeout_exception' ||
e.body?.error?.type === 'receive_timeout_transport_exception'
) {
return Either.left({
type: 'wait_for_task_completion_timeout' as const,
message: `[${e.body.error.type}] ${e.body.error.reason}`,
error: e,
});
} else {
throw e;
}
})
.catch(catchWaitForTaskCompletionTimeout)
.catch(catchRetryableEsClientErrors);
};

Expand Down Expand Up @@ -462,17 +482,25 @@ interface WaitForReindexTaskFailure {
readonly cause: { type: string; reason: string };
}

export interface TargetIndexHadWriteBlock {
type: 'target_index_had_write_block';
}

export interface IncompatibleMappingException {
type: 'incompatible_mapping_exception';
}

export const waitForReindexTask = flow(
waitForTask,
TaskEither.chain(
(
res
): TaskEither.TaskEither<
| { type: 'index_not_found_exception'; index: string }
| { type: 'target_index_had_write_block' }
| { type: 'incompatible_mapping_exception' }
| IndexNotFound
| TargetIndexHadWriteBlock
| IncompatibleMappingException
| RetryableEsClientError
| WaitForTaskCompletionTimeoutError,
| WaitForTaskCompletionTimeout,
'reindex_succeeded'
> => {
const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) =>
Expand Down Expand Up @@ -546,7 +574,7 @@ export const waitForPickupUpdatedMappingsTask = flow(
(
res
): TaskEither.TaskEither<
RetryableEsClientError | WaitForTaskCompletionTimeoutError,
RetryableEsClientError | WaitForTaskCompletionTimeout,
'pickup_updated_mappings_succeeded'
> => {
// We don't catch or type failures/errors because they should never
Expand All @@ -570,6 +598,14 @@ export const waitForPickupUpdatedMappingsTask = flow(
)
);

export interface AliasNotFound {
type: 'alias_not_found_exception';
}

export interface RemoveIndexNotAConcreteIndex {
type: 'remove_index_not_a_concrete_index';
}

export type AliasAction =
| { remove_index: { index: string } }
| { remove: { index: string; alias: string; must_exist: boolean } }
Expand All @@ -582,10 +618,7 @@ export const updateAliases = (
client: ElasticsearchClient,
aliasActions: AliasAction[]
): TaskEither.TaskEither<
| { type: 'index_not_found_exception'; index: string }
| { type: 'alias_not_found_exception' }
| { type: 'remove_index_not_a_concrete_index' }
| RetryableEsClientError,
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
> => () => {
return client.indices
Expand Down
Loading

0 comments on commit a7e7fd6

Please sign in to comment.