Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Upgrade Assistant] Open And Close Slight Refactor #59890

Merged
merged 5 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { IScopedClusterClient } from 'kibana/server';
import { APICaller } from 'kibana/server';
import { getIndexStateFromClusterState } from '../../common/get_index_state_from_cluster_state';
import { ClusterStateAPIResponse } from '../../common/types';

type StatusCheckResult = Record<string, 'open' | 'close'>;

export const esIndicesStateCheck = async (
dataClient: IScopedClusterClient,
callAsUser: APICaller,
indices: string[]
): Promise<StatusCheckResult> => {
// According to https://www.elastic.co/guide/en/elasticsearch/reference/7.6/cluster-state.html
// The response from this call is considered internal and subject to change. We have an API
// integration test for asserting that the current ES version still returns what we expect.
// This lives in x-pack/test/upgrade_assistant_integration
const clusterState: ClusterStateAPIResponse = await dataClient.callAsCurrentUser(
'cluster.state',
{
index: indices,
metric: 'metadata',
}
);
const clusterState: ClusterStateAPIResponse = await callAsUser('cluster.state', {
index: indices,
metric: 'metadata',
});

const result: StatusCheckResult = {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export async function getUpgradeAssistantStatus(
// If we have found deprecation information for index/indices check whether the index is
// open or closed.
if (indexNames.length) {
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
const indexStates = await esIndicesStateCheck(
dataClient.callAsCurrentUser.bind(dataClient),
indexNames
);

indices.forEach(indexData => {
indexData.blockerForReindexing =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

jest.mock('../es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
import { BehaviorSubject } from 'rxjs';
import { Logger } from 'src/core/server';
import { loggingServiceMock } from 'src/core/server/mocks';
Expand All @@ -19,6 +19,8 @@ import { CURRENT_MAJOR_VERSION, PREV_MAJOR_VERSION } from '../../../common/versi
import { licensingMock } from '../../../../licensing/server/mocks';
import { LicensingPluginSetup } from '../../../../licensing/server';

import { esIndicesStateCheck } from '../es_indices_state_check';

import {
isMlIndex,
isWatcherIndex,
Expand All @@ -43,6 +45,7 @@ describe('reindexService', () => {
Promise.reject(`Mock function ${name} was not implemented!`);

beforeEach(() => {
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
actions = {
createReindexOp: jest.fn(unimplemented('createReindexOp')),
deleteReindexOp: jest.fn(unimplemented('deleteReindexOp')),
Expand Down Expand Up @@ -844,7 +847,6 @@ describe('reindexService', () => {
attributes: {
...defaultAttributes,
lastCompletedStep: ReindexStep.newIndexCreated,
reindexOptions: { openAndClose: false },
},
} as ReindexSavedObject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import { APICaller, Logger } from 'src/core/server';
import { first } from 'rxjs/operators';

import { LicensingPluginSetup } from '../../../../licensing/server';

import {
IndexGroup,
ReindexOptions,
Expand All @@ -15,14 +17,16 @@ import {
ReindexWarning,
} from '../../../common/types';

import { esIndicesStateCheck } from '../es_indices_state_check';

import {
generateNewIndexName,
getReindexWarnings,
sourceNameForIndex,
transformFlatSettings,
} from './index_settings';

import { ReindexActions } from './reindex_actions';
import { LicensingPluginSetup } from '../../../../licensing/server';

import { error } from './error';

Expand Down Expand Up @@ -317,7 +321,12 @@ export const reindexServiceFactory = (
const startReindexing = async (reindexOp: ReindexSavedObject) => {
const { indexName, reindexOptions } = reindexOp.attributes;

if (reindexOptions?.openAndClose === true) {
// Where possible, derive reindex options at the last moment before reindexing
// to prevent them from becoming stale as they wait in the queue.
const indicesState = await esIndicesStateCheck(callAsUser, [indexName]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a comment here to reflect your thoughts in the PR description? For example:

// Where possible, derive reindex options at the last moment before reindexing
// to prevent them from becoming stale as they wait in the queue.

const openAndClose = indicesState[indexName] === 'close';
if (indicesState[indexName] === 'close') {
log.debug(`Detected closed index ${indexName}, opening...`);
await callAsUser('indices.open', { index: indexName });
}

Expand All @@ -334,6 +343,12 @@ export const reindexServiceFactory = (
lastCompletedStep: ReindexStep.reindexStarted,
reindexTaskId: startReindex.task,
reindexTaskPercComplete: 0,
reindexOptions: {
...(reindexOptions ?? {}),
// Indicate to downstream states whether we opened a closed index that should be
// closed again.
openAndClose,
},
});
};

Expand Down Expand Up @@ -654,9 +669,16 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

const reindexOptions: ReindexOptions | undefined = opts
? {
...(op.attributes.reindexOptions ?? {}),
...opts,
}
: undefined;

return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts ?? op.attributes.reindexOptions,
reindexOptions,
});
});
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ interface ReindexHandlerArgs {
headers: Record<string, any>;
credentialStore: CredentialStore;
reindexOptions?: {
openAndClose?: boolean;
enqueue?: boolean;
};
}
Expand Down Expand Up @@ -56,7 +55,6 @@ export const reindexHandler = async ({

const opts: ReindexOptions | undefined = reindexOptions
? {
openAndClose: reindexOptions.openAndClose,
queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined,
}
: undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const mockReindexService = {
resumeReindexOperation: jest.fn(),
cancelReindexing: jest.fn(),
};
jest.mock('../../lib/es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
jest.mock('../../lib/es_version_precheck', () => ({
versionCheckHandlerWrapper: (a: any) => a,
}));
Expand All @@ -39,7 +38,6 @@ import {
} from '../../../common/types';
import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
import { registerReindexIndicesRoutes } from './reindex_indices';
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';

/**
* Since these route callbacks are so thin, these serve simply as integration tests
Expand All @@ -57,7 +55,6 @@ describe('reindex API', () => {
} as any;

beforeEach(() => {
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
mockRouter = createMockRouter();
routeDependencies = {
credentialStore,
Expand Down Expand Up @@ -168,9 +165,7 @@ describe('reindex API', () => {
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', {
openAndClose: false,
});
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined);

// It returned the right results
expect(resp.status).toEqual(200);
Expand Down Expand Up @@ -237,10 +232,7 @@ describe('reindex API', () => {
kibanaResponseFactory
);
// It called resume correctly
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', {
openAndClose: false,
queueSettings: undefined,
});
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled();

// It returned the right results
Expand Down Expand Up @@ -269,7 +261,6 @@ describe('reindex API', () => {

describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
openAndClose: false,
queueSettings: { queuedAt: expect.any(Number) },
};
it('creates a collection of index operations', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server';
import { ReindexStatus } from '../../../common/types';

import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck';
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';
import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
Expand Down Expand Up @@ -108,7 +107,6 @@ export function registerReindexIndicesRoutes(
response
) => {
const { indexName } = request.params;
const indexStates = await esIndicesStateCheck(dataClient, [indexName]);
try {
const result = await reindexHandler({
savedObjects: savedObjectsClient,
Expand All @@ -118,7 +116,6 @@ export function registerReindexIndicesRoutes(
licensing,
headers: request.headers,
credentialStore,
reindexOptions: { openAndClose: indexStates[indexName] === 'close' },
});

// Kick the worker on this node to immediately pickup the new reindex operation.
Expand Down Expand Up @@ -190,7 +187,6 @@ export function registerReindexIndicesRoutes(
response
) => {
const { indexNames } = request.body;
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
const results: PostBatchResponse = {
enqueued: [],
errors: [],
Expand All @@ -206,7 +202,6 @@ export function registerReindexIndicesRoutes(
headers: request.headers,
credentialStore,
reindexOptions: {
openAndClose: indexStates[indexName] === 'close',
enqueue: true,
},
});
Expand Down