Skip to content

Commit

Permalink
[Migrations] Update all aliases with a single updateAliases() when re…
Browse files Browse the repository at this point in the history
…locating SO documents (elastic#158940)

Fixes elastic#158733

The goal of this modification is to enforce migrators of all indices
involved in a relocation (e.g. as part of the [dot kibana
split](elastic#104081)) to create the
index aliases in the same `updateAliases()` call.

This way, either:
* all the indices involved in the [dot kibana
split](elastic#104081) relocation will
be completely upgraded (with the appropriate aliases).
* or none of them will.
  • Loading branch information
gsoldevila authored and cqliu1 committed Jun 5, 2023
1 parent 3ff9a64 commit d9010cd
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AcknowledgeResponse } from '.';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
Expand Down Expand Up @@ -46,6 +45,9 @@ export interface CreateIndexParams {
aliases?: string[];
timeout?: string;
}

export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists';

/**
* Creates an index with the given mappings
*
Expand All @@ -64,11 +66,11 @@ export const createIndex = ({
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
CreateIndexSuccessResponse
> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError | ClusterShardLimitExceeded,
AcknowledgeResponse
CreateIndexSuccessResponse
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);

Expand Down Expand Up @@ -103,31 +105,12 @@ export const createIndex = ({
},
},
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated on all nodes with the newly created index, but it
* probably will be created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, index creation complete
*/
return Either.right({
acknowledged: Boolean(res.acknowledged),
shardsAcknowledged: res.shards_acknowledged,
});
.then(() => {
return Either.right('create_index_succeeded' as const);
})
.catch((error) => {
if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous create
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
return Either.right('index_already_exists' as const);
} else if (isClusterShardLimitExceeded(error?.body?.error)) {
return Either.left({
type: 'cluster_shard_limit_exceeded' as const,
Expand All @@ -143,11 +126,12 @@ export const createIndex = ({
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
CreateIndexSuccessResponse,
CreateIndexSuccessResponse
>((res) => {
// Systematicaly wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
// When the index status is 'green' we know that all shards were started
// see https://github.com/elastic/kibana/issues/157968
return pipe(
waitForIndexStatus({
Expand All @@ -156,10 +140,7 @@ export const createIndex = ({
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
TaskEither.map(() => res)
);
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ export {

import type { UnknownDocsFound } from './check_for_unknown_docs';
import type { IncompatibleClusterRoutingAllocation } from './initialize_action';
import { ClusterShardLimitExceeded } from './create_index';
import type { ClusterShardLimitExceeded } from './create_index';
import type { SynchronizationFailed } from './synchronize_migrators';

export type {
CheckForUnknownDocsParams,
Expand Down Expand Up @@ -174,6 +175,7 @@ export interface ActionErrorTypeMap {
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
synchronization_failed: SynchronizationFailed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,36 @@
* Side Public License, v 1.
*/
import { synchronizeMigrators } from './synchronize_migrators';
import { type Defer, defer } from '../kibana_migrator_utils';
import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils';

describe('synchronizeMigrators', () => {
let defers: Array<Defer<void>>;
let allDefersPromise: Promise<any>;
let migratorsDefers: Array<Defer<void>>;
let waitGroups: Array<WaitGroup<void>>;
let allWaitGroupsPromise: Promise<any>;
let migratorsWaitGroups: Array<WaitGroup<void>>;

beforeEach(() => {
jest.clearAllMocks();

defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer);
allDefersPromise = Promise.all(defers.map(({ promise }) => promise));
waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup);
allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise));

migratorsDefers = defers.map(({ resolve, reject }) => ({
migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({
resolve: jest.fn(resolve),
reject: jest.fn(reject),
promise: allDefersPromise,
promise: allWaitGroupsPromise,
}));
});

describe('when all migrators reach the synchronization point with a correct state', () => {
it('unblocks all migrators and resolves Right', async () => {
const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer));
const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup }));

const res = await Promise.all(tasks.map((task) => task()));

migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.resolve).toHaveBeenCalledTimes(1)
);
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.reject).not.toHaveBeenCalled()
migratorsWaitGroups.forEach((waitGroup) =>
expect(waitGroup.resolve).toHaveBeenCalledTimes(1)
);
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());

expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
Expand All @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => {

it('migrators are not unblocked until the last one reaches the synchronization point', async () => {
let resolved: number = 0;
migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we simulate that only kibana_task_manager and kibana migrators get to the sync point
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we don't await for them, or we would be locked forever
Promise.all(tasks.map((task) => task()));

Expand All @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => {
expect(resolved).toEqual(0);

// finally, the last migrator gets to the synchronization point
await synchronizeMigrators(casesDefer)();
await synchronizeMigrators({ waitGroup: casesDefer })();
expect(resolved).toEqual(3);
});
});
Expand All @@ -75,31 +71,29 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we first make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');

// the other migrators then try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand All @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// some migrators try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

// we then make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
Expand All @@ -133,14 +125,14 @@ describe('synchronizeMigrators', () => {
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Defer } from '../kibana_migrator_utils';
import type { WaitGroup } from '../kibana_migrator_utils';

export interface SyncFailed {
type: 'sync_failed';
/** @internal */
export interface SynchronizationFailed {
type: 'synchronization_failed';
error: Error;
}

export function synchronizeMigrators(
defer: Defer<void>
): TaskEither.TaskEither<SyncFailed, 'synchronized_successfully'> {
/** @internal */
export interface SynchronizeMigratorsParams<T, U> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}

export function synchronizeMigrators<T, U>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SynchronizationFailed, U> {
return () => {
defer.resolve();
return defer.promise
.then(() => Either.right('synchronized_successfully' as const))
.catch((error) => Either.left({ type: 'sync_failed' as const, error }));
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.catch((error) => Either.left({ type: 'synchronization_failed' as const, error }));
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import {
calculateTypeStatuses,
createMultiPromiseDefer,
createWaitGroupMap,
getCurrentIndexTypesMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures';

describe('createMultiPromiseDefer', () => {
describe('createWaitGroupMap', () => {
it('creates defer objects with the same Promise', () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
expect(Object.keys(defers)).toHaveLength(2);
expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise);
expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve);
expect(defers['.kibana'].reject).not.toEqual(defers['.kibana_cases'].reject);
});

it('the common Promise resolves when all defers resolve', async () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
let resolved = 0;
Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved));
defers['.kibana'].resolve();
Expand Down
Loading

0 comments on commit d9010cd

Please sign in to comment.