Skip to content

Commit

Permalink
[Telemetry] Move Monitoring collection strategy to a collector (elast…
Browse files Browse the repository at this point in the history
…ic#82638)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
2 people authored and chrisronline committed Nov 19, 2020
1 parent 55c030a commit 67b496e
Show file tree
Hide file tree
Showing 17 changed files with 356 additions and 270 deletions.
2 changes: 1 addition & 1 deletion src/plugins/telemetry_collection_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ export class TelemetryCollectionManagerPlugin
return stats.map((stat) => {
const license = licenses[stat.cluster_uuid];
return {
collectionSource: collection.title,
...(license ? { license } : {}),
...stat,
collectionSource: collection.title,
};
});
}
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/monitoring/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
],
"optionalPlugins": [
"infra",
"telemetryCollectionManager",
"usageCollection",
"home",
"cloud",
Expand Down
35 changes: 7 additions & 28 deletions x-pack/plugins/monitoring/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import {
CoreStart,
CustomHttpResponseOptions,
ResponseError,
IClusterClient,
SavedObjectsServiceStart,
} from 'kibana/server';
import { DEFAULT_APP_CATEGORIES } from '../../../../src/core/server';
import {
Expand All @@ -41,7 +39,7 @@ import { initInfraSource } from './lib/logs/init_infra_source';
import { mbSafeQuery } from './lib/mb_safe_query';
import { instantiateClient } from './es_client/instantiate_client';
import { registerCollectors } from './kibana_monitoring/collectors';
import { registerMonitoringCollection } from './telemetry_collection';
import { registerMonitoringTelemetryCollection } from './telemetry_collection';
import { LicenseService } from './license_service';
import { AlertsFactory } from './alerts';
import {
Expand Down Expand Up @@ -76,8 +74,6 @@ export class Plugin {
private monitoringCore = {} as MonitoringCore;
private legacyShimDependencies = {} as LegacyShimDependencies;
private bulkUploader: IBulkUploader = {} as IBulkUploader;
private telemetryElasticsearchClient: IClusterClient | undefined;
private telemetrySavedObjectsService: SavedObjectsServiceStart | undefined;

constructor(initializerContext: PluginInitializerContext) {
this.initializerContext = initializerContext;
Expand Down Expand Up @@ -145,19 +141,6 @@ export class Plugin {
plugins.alerts?.registerType(alert.getAlertType());
}

// Initialize telemetry
if (plugins.telemetryCollectionManager) {
registerMonitoringCollection({
telemetryCollectionManager: plugins.telemetryCollectionManager,
esCluster: this.cluster,
esClientGetter: () => this.telemetryElasticsearchClient,
soServiceGetter: () => this.telemetrySavedObjectsService,
customContext: {
maxBucketSize: config.ui.max_bucket_size,
},
});
}

// Register collector objects for stats to show up in the APIs
if (plugins.usageCollection) {
core.savedObjects.registerType({
Expand All @@ -174,6 +157,11 @@ export class Plugin {
});

registerCollectors(plugins.usageCollection, config, cluster);
registerMonitoringTelemetryCollection(
plugins.usageCollection,
cluster,
config.ui.max_bucket_size
);
}

// Always create the bulk uploader
Expand Down Expand Up @@ -253,16 +241,7 @@ export class Plugin {
};
}

start({ elasticsearch, savedObjects }: CoreStart) {
// TODO: For the telemetry plugin to work, we need to provide the new ES client.
// The new client should be inititalized with a similar config to `this.cluster` but, since we're not using
// the new client in Monitoring Telemetry collection yet, setting the local client allows progress for now.
// The usage collector `fetch` method has been refactored to accept a `collectorFetchContext` object,
// exposing both es clients and the saved objects client.
// We will update the client in a follow up PR.
this.telemetryElasticsearchClient = elasticsearch.client;
this.telemetrySavedObjectsService = savedObjects;
}
start() {}

stop() {
if (this.cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import { getStackStats, getAllStats, handleAllStats } from './get_all_stats';
import { ESClusterStats } from './get_es_stats';
import { KibanaStats } from './get_kibana_stats';
import { ClustersHighLevelStats } from './get_high_level_stats';
import { coreMock } from 'src/core/server/mocks';

describe('get_all_stats', () => {
const timestamp = Date.now();
const callCluster = sinon.stub();
const esClient = sinon.stub();
const soClient = sinon.stub();

const esClusters = [
{ cluster_uuid: 'a' },
Expand Down Expand Up @@ -172,24 +169,7 @@ describe('get_all_stats', () => {
.onCall(4)
.returns(Promise.resolve({})); // Beats state

expect(
await getAllStats(
[{ clusterUuid: 'a' }],
{
callCluster: callCluster as any,
esClient: esClient as any,
soClient: soClient as any,
usageCollection: {} as any,
kibanaRequest: undefined,
timestamp,
},
{
logger: coreMock.createPluginInitializerContext().logger.get('test'),
version: 'version',
maxBucketSize: 1,
}
)
).toStrictEqual(allClusters);
expect(await getAllStats(['a'], callCluster, timestamp, 1)).toStrictEqual(allClusters);
});

it('returns empty clusters', async () => {
Expand All @@ -199,24 +179,7 @@ describe('get_all_stats', () => {

callCluster.withArgs('search').returns(Promise.resolve(clusterUuidsResponse));

expect(
await getAllStats(
[],
{
callCluster: callCluster as any,
esClient: esClient as any,
soClient: soClient as any,
usageCollection: {} as any,
kibanaRequest: undefined,
timestamp,
},
{
logger: coreMock.createPluginInitializerContext().logger.get('test'),
version: 'version',
maxBucketSize: 1,
}
)
).toStrictEqual([]);
expect(await getAllStats([], callCluster, timestamp, 1)).toStrictEqual([]);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import { set } from '@elastic/safer-lodash-set';
import { get, merge } from 'lodash';

import { StatsGetter } from 'src/plugins/telemetry_collection_manager/server';
import moment from 'moment';
import { LegacyAPICaller } from 'kibana/server';
import {
LOGSTASH_SYSTEM_ID,
KIBANA_SYSTEM_ID,
Expand All @@ -20,24 +20,20 @@ import { getKibanaStats, KibanaStats } from './get_kibana_stats';
import { getBeatsStats, BeatsStatsByClusterUuid } from './get_beats_stats';
import { getHighLevelStats, ClustersHighLevelStats } from './get_high_level_stats';

export interface CustomContext {
maxBucketSize: number;
}
/**
* Get statistics for all products joined by Elasticsearch cluster.
* Returns the array of clusters joined with the Kibana and Logstash instances.
*
*/
export const getAllStats: StatsGetter<CustomContext> = async (
clustersDetails,
{ callCluster, timestamp },
{ maxBucketSize }
) => {
export async function getAllStats(
clusterUuids: string[],
callCluster: LegacyAPICaller, // TODO: To be changed to the new ES client when the plugin migrates
timestamp: number,
maxBucketSize: number
) {
const start = moment(timestamp).subtract(USAGE_FETCH_INTERVAL, 'ms').toISOString();
const end = moment(timestamp).toISOString();

const clusterUuids = clustersDetails.map((clusterDetails) => clusterDetails.clusterUuid);

const [esClusters, kibana, logstash, beats] = await Promise.all([
getElasticsearchStats(callCluster, clusterUuids, maxBucketSize), // cluster_stats, stack_stats.xpack, cluster_name/uuid, license, version
getKibanaStats(callCluster, clusterUuids, start, end, maxBucketSize), // stack_stats.kibana
Expand All @@ -46,7 +42,7 @@ export const getAllStats: StatsGetter<CustomContext> = async (
]);

return handleAllStats(esClusters, { kibana, logstash, beats });
};
}

/**
* Combine the statistics from the stack to create "cluster" stats that associate all products together based on the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,35 @@
*/

import sinon from 'sinon';
import { elasticsearchServiceMock, savedObjectsRepositoryMock } from 'src/core/server/mocks';
import {
getClusterUuids,
fetchClusterUuids,
handleClusterUuidsResponse,
} from './get_cluster_uuids';

describe('get_cluster_uuids', () => {
const kibanaRequest = undefined;
const callCluster = sinon.stub();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const soClient = savedObjectsRepositoryMock.create();
const response = {
aggregations: {
cluster_uuids: {
buckets: [{ key: 'abc' }, { key: 'xyz' }, { key: '123' }],
},
},
};
const expectedUuids = response.aggregations.cluster_uuids.buckets
.map((bucket) => bucket.key)
.map((expectedUuid) => ({ clusterUuid: expectedUuid }));
const expectedUuids = response.aggregations.cluster_uuids.buckets.map((bucket) => bucket.key);
const timestamp = Date.now();

describe('getClusterUuids', () => {
it('returns cluster UUIDs', async () => {
callCluster.withArgs('search').returns(Promise.resolve(response));
expect(
await getClusterUuids(
{ callCluster, esClient, soClient, timestamp, kibanaRequest, usageCollection: {} as any },
{
maxBucketSize: 1,
} as any
)
).toStrictEqual(expectedUuids);
expect(await getClusterUuids(callCluster, timestamp, 1)).toStrictEqual(expectedUuids);
});
});

describe('fetchClusterUuids', () => {
it('searches for clusters', async () => {
callCluster.returns(Promise.resolve(response));
expect(
await fetchClusterUuids(
{ callCluster, esClient, soClient, timestamp, kibanaRequest, usageCollection: {} as any },
{
maxBucketSize: 1,
} as any
)
).toStrictEqual(response);
expect(await fetchClusterUuids(callCluster, timestamp, 1)).toStrictEqual(response);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,31 @@

import { get } from 'lodash';
import moment from 'moment';
import {
ClusterDetailsGetter,
StatsCollectionConfig,
ClusterDetails,
} from 'src/plugins/telemetry_collection_manager/server';
import { LegacyAPICaller } from 'kibana/server';
import { createQuery } from './create_query';
import {
INDEX_PATTERN_ELASTICSEARCH,
CLUSTER_DETAILS_FETCH_INTERVAL,
} from '../../common/constants';
import { CustomContext } from './get_all_stats';

/**
* Get a list of Cluster UUIDs that exist within the specified timespan.
*/
export const getClusterUuids: ClusterDetailsGetter<CustomContext> = async (
config,
{ maxBucketSize }
) => {
const response = await fetchClusterUuids(config, maxBucketSize);
export async function getClusterUuids(
callCluster: LegacyAPICaller, // TODO: To be changed to the new ES client when the plugin migrates
timestamp: number,
maxBucketSize: number
) {
const response = await fetchClusterUuids(callCluster, timestamp, maxBucketSize);
return handleClusterUuidsResponse(response);
};
}

/**
* Fetch the aggregated Cluster UUIDs from the monitoring cluster.
*/
export async function fetchClusterUuids(
{ callCluster, timestamp }: StatsCollectionConfig,
callCluster: LegacyAPICaller,
timestamp: number,
maxBucketSize: number
) {
const start = moment(timestamp).subtract(CLUSTER_DETAILS_FETCH_INTERVAL, 'ms').toISOString();
Expand Down Expand Up @@ -66,10 +64,7 @@ export async function fetchClusterUuids(
* @param {Object} response The aggregation response
* @return {Array} Strings; each representing a Cluster's UUID.
*/
export function handleClusterUuidsResponse(response: any): ClusterDetails[] {
export function handleClusterUuidsResponse(response: any): string[] {
const uuidBuckets: any[] = get(response, 'aggregations.cluster_uuids.buckets', []);

return uuidBuckets.map((uuidBucket) => ({
clusterUuid: uuidBucket.key as string,
}));
return uuidBuckets.map((uuidBucket) => uuidBucket.key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('get_licenses', () => {
},
};
const expectedClusters = response.hits.hits.map((hit) => hit._source);
const clusterUuids = expectedClusters.map((cluster) => ({ clusterUuid: cluster.cluster_uuid }));
const clusterUuids = expectedClusters.map((cluster) => cluster.cluster_uuid);
const expectedLicenses = {
abc: { type: 'basic' },
xyz: { type: 'basic' },
Expand All @@ -30,27 +30,15 @@ describe('get_licenses', () => {
it('returns clusters', async () => {
callWith.withArgs('search').returns(Promise.resolve(response));

expect(
await getLicenses(
clusterUuids,
{ callCluster: callWith } as any,
{ maxBucketSize: 1 } as any
)
).toStrictEqual(expectedLicenses);
expect(await getLicenses(clusterUuids, callWith, 1)).toStrictEqual(expectedLicenses);
});
});

describe('fetchLicenses', () => {
it('searches for clusters', async () => {
callWith.returns(response);

expect(
await fetchLicenses(
callWith,
clusterUuids.map(({ clusterUuid }) => clusterUuid),
{ maxBucketSize: 1 } as any
)
).toStrictEqual(response);
expect(await fetchLicenses(callWith, clusterUuids, 1)).toStrictEqual(response);
});
});

Expand Down
Loading

0 comments on commit 67b496e

Please sign in to comment.