Skip to content

Commit

Permalink
Merge pull request #5 from tsullivan/monitoring/collector_format-usag…
Browse files Browse the repository at this point in the history
…e-service-changes

Add Usage Service changes for Saved Objects Client + add getFilteredCollectorSet method
  • Loading branch information
chrisronline committed Aug 6, 2018
2 parents c662cc4 + b1cc66c commit 316bd67
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ export function createSavedObjectsService(server) {
}
});

const unscopedClientProvider = {
getClient(callCluster) {
const repository = repositoryProvider.getRepository(callCluster);
return new SavedObjectsClient(repository);
}
};

return {
types: Object.keys(getRootPropertiesObjects(mappings)),
SavedObjectsClient,
Expand All @@ -94,5 +101,7 @@ export function createSavedObjectsService(server) {
scopedClientProvider.setClientFactory(...args),
addScopedSavedObjectsClientWrapperFactory: (...args) =>
scopedClientProvider.addClientWrapperFactory(...args),
getUnscopedSavedObjectsClient: (...args) =>
unscopedClientProvider.getClient(...args),
};
}
49 changes: 49 additions & 0 deletions src/server/saved_objects/service/lib/repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

import { get, snakeCase } from 'lodash';
import uuid from 'uuid';

import { getRootType } from '../../../mappings';
Expand Down Expand Up @@ -210,6 +211,54 @@ export class SavedObjectsRepository {
);
}

/**
* Summarize
*/
async summarize() {
const TYPES = [
'dashboard',
'graph-workspace',
'index-pattern',
'search',
'timelion-sheet',
'visualization',
];

const esOptions = {
index: this._index,
size: 0,
ignore: [404],
filterPath: 'aggregations.types.buckets',
body: {
query: {
terms: { type: TYPES },
},
aggs: {
types: {
terms: { field: 'type', size: TYPES.length }
}
}
}
};
const response = await this._callCluster('search', esOptions);
const buckets = get(response, 'aggregations.types.buckets', []);
const bucketCounts = buckets.reduce((acc, bucket) => ({
...acc,
[bucket.key]: bucket.doc_count,
}), {});

return {
index: this._index,
...TYPES.reduce((acc, type) => ({ // combine the bucketCounts and 0s for types that don't have documents
...acc,
[snakeCase(type)]: {
total: bucketCounts[type] || 0
}
}), {})
};

}

/**
* @param {object} [options={}]
* @property {(string|Array<string>)} [options.type]
Expand Down
33 changes: 33 additions & 0 deletions src/server/saved_objects/service/lib/repository.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -666,4 +666,37 @@ describe('SavedObjectsRepository', () => {
}
});
});

describe('summarize', () => {
beforeEach(() => {
const aggResults = {
aggregations: {
types: {
buckets: [
{ key: 'dashboard', doc_count: 13 },
{ key: 'graph-workspace', doc_count: 2 },
{ key: 'index-pattern', doc_count: 4 },
{ key: 'search', doc_count: 13 },
{ key: 'timelion-sheet', doc_count: 4 },
{ key: 'visualization', doc_count: 65 },
]
}
}
};
callAdminCluster.returns(aggResults);
});

it('summarizes saved objects', async () => {
const response = await savedObjectsRepository.summarize();
expect(response).toEqual({
index: '.kibana-test',
index_pattern: { total: 4 },
dashboard: { total: 13 },
graph_workspace: { total: 2 },
search: { total: 13 },
timelion_sheet: { total: 4 },
visualization: { total: 65 }
});
});
});
});
7 changes: 7 additions & 0 deletions src/server/saved_objects/service/saved_objects_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,11 @@ export class SavedObjectsClient {
async update(type, id, attributes, options = {}) {
return this._repository.update(type, id, attributes, options);
}

/*
* Summarize the objects
*/
async summarize() {
return this._repository.summarize();
}
}
3 changes: 2 additions & 1 deletion src/server/status/collectors/get_ops_stats_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) {
kibana: getKibanaInfoForStats(server, kbnServer),
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
};
}
},
internalIgnore: true, // Ignore this one from internal uploader. A different stats collector is used there.
});
}
7 changes: 4 additions & 3 deletions src/server/status/routes/api/register_stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export function registerStatsApi(kbnServer, server, config) {
return uuid;
};

const getUsage = async callCluster => {
const usage = await collectorSet.bulkFetchUsage(callCluster);
const getUsage = async (callCluster, savedObjectsClient) => {
const usage = await collectorSet.bulkFetchUsage({ callCluster, savedObjectsClient });
return collectorSet.toObject(usage);
};

Expand All @@ -66,9 +66,10 @@ export function registerStatsApi(kbnServer, server, config) {
if (isExtended) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('admin');
const callCluster = (...args) => callWithRequest(req, ...args);
const savedObjectsClient = req.getSavedObjectsClient();
try {
const [ usage, clusterUuid ] = await Promise.all([
getUsage(callCluster),
getUsage(callCluster, savedObjectsClient),
getClusterUuid(callCluster),
]);

Expand Down
8 changes: 6 additions & 2 deletions src/server/usage/classes/__tests__/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ describe('CollectorSet', () => {

it('should log debug status of fetching from the collector', async () => {
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const mockSavedObjectsClient = { summarize: () => ({}) };
const collectors = new CollectorSet(server);
collectors.register(new Collector(server, {
type: 'MY_TEST_COLLECTOR',
fetch: caller => caller()
fetch: ({ callCluster: caller }) => caller()
}));

const result = await collectors.bulkFetch(mockCallCluster);
const result = await collectors.bulkFetch({
callCluster: mockCallCluster,
savedObjectsClient: mockSavedObjectsClient
});
const calls = server.log.getCalls();
expect(calls.length).to.be(1);
expect(calls[0].args).to.eql([
Expand Down
32 changes: 26 additions & 6 deletions src/server/usage/classes/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,47 @@ export class Collector {
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(server, { type, init, fetch, formatForBulkUpload = null } = {}) {
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
}
if (typeof init !== 'undefined' && typeof init !== 'function') {
throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property');
}
if (typeof fetch !== 'function') {
throw new Error('Collector must be instantiated with a options.fetch function property');
}

Object.assign(this, options); // spread in other properties and mutate "this"

this.type = type;
this.init = init;
this.fetch = fetch;
this.formatForBulkUpload = formatForBulkUpload;

const defaultFormatterForBulkUpload = result => [ { type, payload: result } ];
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;

this.log = getCollectorLogger(server);
}

fetchInternal(callCluster) {
if (typeof callCluster !== 'function') {
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
/*
* @param {Object} fetchMechanisms - an object with a callCluster function and a savedObjectsClient object
*/
fetchInternal(fetchMechanisms) {
const { callCluster, savedObjectsClient } = fetchMechanisms;
if (typeof callCluster !== 'function' || typeof savedObjectsClient !== 'object') {
throw new Error(
'An object must be passed to the fetch methods of collectors having ' +
'properties of a callCluster function and savedObjectsClient object'
);
}
return this.fetch(callCluster);
return this.fetch(fetchMechanisms);
}

formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}
}
67 changes: 34 additions & 33 deletions src/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';

function defaultFormatterForBulkUpload(collector, result) {
return [
{ type: collector.type, payload: result }
];
}

/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
Expand All @@ -38,20 +32,20 @@ export class CollectorSet {

/*
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
*/
constructor(server) {
constructor(server, collectors = []) {
this._log = getCollectorLogger(server);
this._collectors = [];
this._collectors = collectors;

/*
* Helper Factory methods
* Define as instance properties to allow enclosing the server object
*/
this.makeStatsCollector = options => new Collector(server, options);
this.makeUsageCollector = options => new UsageCollector(server, options);

this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
}

/*
Expand All @@ -77,51 +71,54 @@ export class CollectorSet {

/*
* Call a bunch of fetch methods and then do them in bulk
* @param {Object} fetchMechanisms - an object with a callCluster function and a savedObjectsClient object
* @param {Array} collectors - an array of collectors, default to all registered collectors
*/
bulkFetch(callCluster, collectors = this._collectors) {
if (!Array.isArray(collectors)) {
bulkFetch(fetchMechanisms, collectors = this) {
if (!(collectors instanceof CollectorSet)) {
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
}

return Promise.map(collectors, collector => {
const fetchPromises = collectors.map(collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
return Promise.props({
type: collectorType,
result: collector.fetchInternal(callCluster) // use the wrapper for fetch, kicks in error checking
result: collector.fetchInternal(fetchMechanisms) // use the wrapper for fetch, kicks in error checking
})
.catch(err => {
this._log.warn(err);
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
});
});
return Promise.all(fetchPromises);
}

getCollectorByType(type, collectors = this._collectors) {
return collectors.find(collector => collector.type === type);
}

bulkFormat(data, collectors = this._collectors) {
if (!Array.isArray(collectors)) {
throw new Error(`bulkFormat method given bad collectors parameter: ` + typeof collectors);
}

return data.reduce((accum, collectedData) => {
if (isEmpty(collectedData)) {
bulkFormat(data) {
return data.reduce((accum, { type, result }) => {
if (isEmpty(result)) {
return accum;
}

const collector = this.getCollectorByType(collectedData.type);
const formatter = collector.formatForBulkUpload || defaultFormatterForBulkUpload.bind(null, collector);
accum.push(formatter(collectedData.result));
return accum;
const payload = this.getCollectorByType(type).formatForBulkUpload(result);
return [
...accum,
payload // TODO flatten it here
];
}, []);
}

async bulkFetchUsage(callCluster) {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
return this.bulkFetch(callCluster, usageCollectors);
/*
* @return {new CollectorSet}
*/
getFilteredCollectorSet(filter) {
const filtered = this._collectors.filter(filter);
return this._makeCollectorSetFromArray(filtered);
}

async bulkFetchUsage(fetchMechanisms) {
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector);
return this.bulkFetch(fetchMechanisms, usageCollectors);
}

// convert an array of fetched stats results into key/object
Expand Down Expand Up @@ -164,4 +161,8 @@ export class CollectorSet {
};
}, {});
}

map(mapFn) {
return this._collectors.map(mapFn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ describe('BulkUploader', () => {
},
},
usage: {},
savedObjects: {
getUnscopedSavedObjectsClient: sinon.stub()
}
};
});

Expand Down
Loading

0 comments on commit 316bd67

Please sign in to comment.