Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logs UI] Logs overview queries for the observability dashboard #70413

Merged
merged 16 commits into from
Jul 3, 2020
235 changes: 169 additions & 66 deletions x-pack/plugins/infra/public/utils/logs_overview_fetchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,90 +4,193 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { InfraClientCoreSetup } from '../types';
import { LogsFetchDataResponse } from '../../../observability/public';
import { InfraClientCoreSetup, InfraClientStartDeps } from '../types';
import {
FetchData,
LogsFetchDataResponse,
HasData,
FetchDataParams,
} from '../../../observability/public';

export function getLogsHasDataFetcher(getStartServices: InfraClientCoreSetup['getStartServices']) {
return async () => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
interface StatsAggregation {
buckets: Array<{ key: string; doc_count: number }>;
}

// if you need a core dep, we need to pass in more than just getStartServices
interface SeriesAggregation {
buckets: Array<{
key_as_string: string;
key: number;
doc_count: number;
dataset: StatsAggregation;
}>;
}

interface LogParams {
index: string;
timestampField: string;
}

type StatsAndSeries = Pick<LogsFetchDataResponse, 'stats' | 'series'>;

export function getLogsHasDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
): HasData {
return async () => {
const [, startPlugins] = await getStartServices();
const { data } = startPlugins;

// perform query
return true;
return await hasLogsOverview('filebeat-*', data);
};
}

export function getLogsOverviewDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
) {
return async (): Promise<LogsFetchDataResponse> => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
): FetchData<LogsFetchDataResponse> {
return async (params) => {
const [, startPlugins] = await getStartServices();
const { data } = startPlugins;

// if you need a core dep, we need to pass in more than just getStartServices
// FIXME figure out how to get these from the sourceConfiguration
const { stats, series } = await fetchLogsOverview(
{ index: 'filebeat-*', timestampField: '@timestamp' },
params,
data
);

// perform query
return {
title: 'Log rate',
afgomez marked this conversation as resolved.
Show resolved Hide resolved
appLink: 'TBD', // TODO: what format should this be in, relative I assume?
afgomez marked this conversation as resolved.
Show resolved Hide resolved
stats: {
nginx: {
type: 'number',
label: 'nginx',
value: 345341,
},
'elasticsearch.audit': {
type: 'number',
label: 'elasticsearch.audit',
value: 164929,
stats,
series,
};
};
}

async function hasLogsOverview(
index: string,
dataPlugin: InfraClientStartDeps['data']
): Promise<boolean> {
const esSearcher = dataPlugin.search.getSearchStrategy('es');
return new Promise((resolve, reject) => {
esSearcher
.search({
params: {
index,
body: {
size: 0,
afgomez marked this conversation as resolved.
Show resolved Hide resolved
},
},
'haproxy.log': {
type: 'number',
label: 'haproxy.log',
value: 51101,
})
.subscribe(
(response) => {
resolve(response.rawResponse.hits.total > 0);
},
},
// Note: My understanding is that these series coordinates will be
// combined into objects that look like:
// { x: timestamp, y: value, g: label (e.g. nginx) }
// so they fit the stacked bar chart API
// https://elastic.github.io/elastic-charts/?path=/story/bar-chart--stacked-with-axis-and-legend
series: {
nginx: {
label: 'nginx',
coordinates: [
{ x: 1593000000000, y: 10014 },
{ x: 1593000900000, y: 12827 },
{ x: 1593001800000, y: 2946 },
{ x: 1593002700000, y: 14298 },
{ x: 1593003600000, y: 4096 },
],
(error) => reject(error)
);
});
}

async function fetchLogsOverview(
logParams: LogParams,
params: FetchDataParams,
dataPlugin: InfraClientStartDeps['data']
): Promise<StatsAndSeries> {
const esSearcher = dataPlugin.search.getSearchStrategy('es');
return new Promise((resolve, reject) => {
esSearcher
.search({
params: {
index: logParams.index,
body: {
size: 0,
query: buildLogOverviewQuery(logParams, params),
aggs: buildLogOverviewAggregations(logParams, params),
},
},
'elasticsearch.audit': {
label: 'elasticsearch.audit',
coordinates: [
{ x: 1593000000000, y: 5676 },
{ x: 1593000900000, y: 6783 },
{ x: 1593001800000, y: 2394 },
{ x: 1593002700000, y: 4554 },
{ x: 1593003600000, y: 5659 },
],
})
.subscribe(
(response) => {
if (response.rawResponse.aggregations) {
resolve(processLogsOverviewAggregations(response.rawResponse.aggregations));
} else {
resolve({ stats: {}, series: {} });
}
},
'haproxy.log': {
label: 'haproxy.log',
coordinates: [
{ x: 1593000000000, y: 9085 },
{ x: 1593000900000, y: 9002 },
{ x: 1593001800000, y: 3940 },
{ x: 1593002700000, y: 5451 },
{ x: 1593003600000, y: 9133 },
],
(error) => reject(error)
);
});
}

function buildLogOverviewQuery(logParams: LogParams, params: FetchDataParams) {
return {
range: {
[logParams.timestampField]: {
gt: params.startTime,
lte: params.endTime,
format: 'strict_date_optional_time',
},
},
};
}

function buildLogOverviewAggregations(logParams: LogParams, params: FetchDataParams) {
return {
stats: {
terms: {
field: 'event.dataset',
size: 4,
},
},
series: {
date_histogram: {
field: logParams.timestampField,
fixed_interval: params.bucketSize,
},
aggs: {
dataset: {
terms: {
field: 'event.dataset',
size: 4,
},
},
},
};
},
};
}

function processLogsOverviewAggregations(aggregations: {
stats: StatsAggregation;
series: SeriesAggregation;
}): StatsAndSeries {
const processedStats = aggregations.stats.buckets.reduce<StatsAndSeries['stats']>(
(result, bucket) => {
result[bucket.key] = {
type: 'number',
label: bucket.key,
value: bucket.doc_count,
};

return result;
},
{}
);

const processedSeries = aggregations.series.buckets.reduce<StatsAndSeries['series']>(
(result, bucket) => {
const x = bucket.key; // the timestamp of the bucket
bucket.dataset.buckets.forEach((b) => {
const label = b.key;
result[label] = result[label] || { label, coordinates: [] };
result[label].coordinates.push({ x, y: b.doc_count });
});

return result;
},
{}
);

return {
stats: processedStats,
series: processedSeries,
};
}