Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Uptime] Use scripted metric for snapshot calculation (#58247) #58389

Merged
merged 1 commit into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 138 additions & 56 deletions x-pack/legacy/plugins/uptime/server/lib/requests/get_snapshot_counts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { UMElasticsearchQueryFn } from '../adapters';
import { Snapshot } from '../../../common/runtime_types';
import { QueryContext, MonitorGroupIterator } from './search';
import { QueryContext } from './search';
import { CONTEXT_DEFAULTS, INDEX_NAMES } from '../../../common/constants';

export interface GetSnapshotCountParams {
Expand All @@ -16,49 +16,6 @@ export interface GetSnapshotCountParams {
statusFilter?: string;
}

const fastStatusCount = async (context: QueryContext): Promise<Snapshot> => {
const params = {
index: INDEX_NAMES.HEARTBEAT,
body: {
size: 0,
query: { bool: { filter: await context.dateAndCustomFilters() } },
aggs: {
unique: {
// We set the precision threshold to 40k which is the max precision supported by cardinality
cardinality: { field: 'monitor.id', precision_threshold: 40000 },
},
down: {
filter: { range: { 'summary.down': { gt: 0 } } },
aggs: {
unique: { cardinality: { field: 'monitor.id', precision_threshold: 40000 } },
},
},
},
},
};

const statistics = await context.search(params);
const total = statistics.aggregations.unique.value;
const down = statistics.aggregations.down.unique.value;

return {
total,
down,
up: total - down,
};
};

const slowStatusCount = async (context: QueryContext, status: string): Promise<number> => {
const downContext = context.clone();
downContext.statusFilter = status;
const iterator = new MonitorGroupIterator(downContext);
let count = 0;
while (await iterator.next()) {
count++;
}
return count;
};

export const getSnapshotCount: UMElasticsearchQueryFn<GetSnapshotCountParams, Snapshot> = async ({
callES,
dateRangeStart,
Expand All @@ -81,22 +38,147 @@ export const getSnapshotCount: UMElasticsearchQueryFn<GetSnapshotCountParams, Sn
);

// Calculate the total, up, and down counts.
const counts = await fastStatusCount(context);

// Check if the last count was accurate, if not, we need to perform a slower count with the
// MonitorGroupsIterator.
if (!(await context.hasTimespan())) {
// Figure out whether 'up' or 'down' is more common. It's faster to count the lower cardinality
// one then use subtraction to figure out its opposite.
const [leastCommonStatus, mostCommonStatus]: Array<'up' | 'down'> =
counts.up > counts.down ? ['down', 'up'] : ['up', 'down'];
counts[leastCommonStatus] = await slowStatusCount(context, leastCommonStatus);
counts[mostCommonStatus] = counts.total - counts[leastCommonStatus];
}
const counts = await statusCount(context);

return {
total: statusFilter ? counts[statusFilter] : counts.total,
up: statusFilter === 'down' ? 0 : counts.up,
down: statusFilter === 'up' ? 0 : counts.down,
};
};

const statusCount = async (context: QueryContext): Promise<Snapshot> => {
const res = await context.search({
index: INDEX_NAMES.HEARTBEAT,
body: statusCountBody(await context.dateAndCustomFilters()),
});

return res.aggregations.counts.value;
};

const statusCountBody = (filters: any): any => {
return {
size: 0,
query: {
bool: {
filter: [
{
exists: {
field: 'summary',
},
},
filters,
],
},
},
aggs: {
counts: {
scripted_metric: {
init_script: 'state.locStatus = new HashMap(); state.totalDocs = 0;',
map_script: `
def loc = doc["observer.geo.name"].size() == 0 ? "" : doc["observer.geo.name"][0];

// One concern here is memory since we could build pretty gigantic maps. I've opted to
// stick to a simple <String,String> map to reduce memory overhead. This means we do
// a little string parsing to treat these strings as records that stay lexicographically
// sortable (which is important later).
// We encode the ID and location as $id.len:$id$loc
String id = doc["monitor.id"][0];
String idLenDelim = Integer.toHexString(id.length()) + ":" + id;
String idLoc = loc == null ? idLenDelim : idLenDelim + loc;

String status = doc["summary.down"][0] > 0 ? "d" : "u";
String timeAndStatus = doc["@timestamp"][0].toInstant().toEpochMilli().toString() + status;
state.locStatus[idLoc] = timeAndStatus;
state.totalDocs++;
`,
combine_script: `
return state;
`,
reduce_script: `
// Use a treemap since it's traversable in sorted order.
// This is important later.
TreeMap locStatus = new TreeMap();
long totalDocs = 0;
int uniqueIds = 0;
for (state in states) {
totalDocs += state.totalDocs;
for (entry in state.locStatus.entrySet()) {
// Update the value for the given key if we have a more recent check from this location.
locStatus.merge(entry.getKey(), entry.getValue(), (a,b) -> a.compareTo(b) > 0 ? a : b)
}
}

HashMap locTotals = new HashMap();
int total = 0;
int down = 0;
String curId = "";
boolean curIdDown = false;
// We now iterate through our tree map in order, which means records for a given ID
// always are encountered one after the other. This saves us having to make an intermediate
// map.
for (entry in locStatus.entrySet()) {
String idLoc = entry.getKey();
String timeStatus = entry.getValue();

// Parse the length delimited id/location strings described in the map section
int colonIndex = idLoc.indexOf(":");
int idEnd = Integer.parseInt(idLoc.substring(0, colonIndex), 16) + colonIndex + 1;
String id = idLoc.substring(colonIndex + 1, idEnd);
String loc = idLoc.substring(idEnd, idLoc.length());
String status = timeStatus.substring(timeStatus.length() - 1);

// Here we increment counters for the up/down key per location
// We also create a new hashmap in locTotals if we've never seen this location
// before.
locTotals.compute(loc, (k,v) -> {
HashMap res = v;
if (v == null) {
res = new HashMap();
res.put('up', 0);
res.put('down', 0);
}

if (status == 'u') {
res.up++;
} else {
res.down++;
}

return res;
});


// We've encountered a new ID
if (curId != id) {
total++;
curId = id;
if (status == "d") {
curIdDown = true;
down++;
} else {
curIdDown = false;
}
} else if (!curIdDown) {
if (status == "d") {
curIdDown = true;
down++;
} else {
curIdDown = false;
}
}
}

Map result = new HashMap();
result.total = total;
result.location_totals = locTotals;
result.up = total - down;
result.down = down;
result.totalDocs = totalDocs;
return result;
`,
},
},
},
};
};
105 changes: 54 additions & 51 deletions x-pack/test/api_integration/apis/uptime/rest/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,66 +34,69 @@ export default function({ getService }: FtrProviderContext) {
let dateRange: { start: string; end: string };

[true, false].forEach(async (includeTimespan: boolean) => {
describe(`with timespans ${includeTimespan ? 'included' : 'missing'}`, async () => {
before(async () => {
const promises: Array<Promise<any>> = [];

// When includeTimespan is false we have to remove the values there.
let mogrify = (d: any) => d;
if ((includeTimespan = false)) {
mogrify = (d: any): any => {
d.monitor.delete('timespan');
[true, false].forEach(async (includeObserver: boolean) => {
describe(`with timespans=${includeTimespan} and observer=${includeObserver}`, async () => {
before(async () => {
const promises: Array<Promise<any>> = [];

const mogrify = (d: any) => {
if (!includeTimespan) {
delete d.monitor.timespan;
}
if (!includeObserver) {
delete d.observer;
}
return d;
};
}

const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => {
return makeChecksWithStatus(
getService('legacyEs'),
monitorId,
checksPerMonitor,
numIps,
scheduleEvery,
{},
status,
mogrify
);
};

for (let i = 0; i < numUpMonitors; i++) {
promises.push(makeMonitorChecks(`up-${i}`, 'up'));
}
for (let i = 0; i < numDownMonitors; i++) {
promises.push(makeMonitorChecks(`down-${i}`, 'down'));
}
const makeMonitorChecks = async (monitorId: string, status: 'up' | 'down') => {
return makeChecksWithStatus(
getService('legacyEs'),
monitorId,
checksPerMonitor,
numIps,
scheduleEvery,
{},
status,
mogrify
);
};

const allResults = await Promise.all(promises);
dateRange = getChecksDateRange(allResults);
});
for (let i = 0; i < numUpMonitors; i++) {
promises.push(makeMonitorChecks(`up-${i}`, 'up'));
}
for (let i = 0; i < numDownMonitors; i++) {
promises.push(makeMonitorChecks(`down-${i}`, 'down'));
}

it('will count all statuses correctly', async () => {
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}`
);
const allResults = await Promise.all(promises);
dateRange = getChecksDateRange(allResults);
});

expectFixtureEql(apiResponse.body, 'snapshot');
});
it('will count all statuses correctly', async () => {
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}`
);

it('will fetch a monitor snapshot filtered by down status', async () => {
const statusFilter = 'down';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot');
});

expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down');
});
it('will fetch a monitor snapshot filtered by down status', async () => {
const statusFilter = 'down';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);

it('will fetch a monitor snapshot filtered by up status', async () => {
const statusFilter = 'up';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up');
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_down');
});

it('will fetch a monitor snapshot filtered by up status', async () => {
const statusFilter = 'up';
const apiResponse = await supertest.get(
`/api/uptime/snapshot/count?dateRangeStart=${dateRange.start}&dateRangeEnd=${dateRange.end}&statusFilter=${statusFilter}`
);
expectFixtureEql(apiResponse.body, 'snapshot_filtered_by_up');
});
});
});
});
Expand Down