Skip to content

Commit

Permalink
[Telemetry] server fetcher check all collectors ready before sending (#…
Browse files Browse the repository at this point in the history
…79398)

Co-authored-by: Alejandro Fernández Haro <afharo@gmail.com>
  • Loading branch information
Bamieh and afharo committed Oct 5, 2020
1 parent 519d490 commit f960e89
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 22 deletions.
84 changes: 79 additions & 5 deletions src/plugins/telemetry/server/fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,93 @@ import { coreMock } from '../../../core/server/mocks';

describe('FetcherTask', () => {
describe('sendIfDue', () => {
it('returns undefined and warns when it fails to get telemetry configs', async () => {
it('stops when it fails to get telemetry configs', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const mockError = new Error('Some message.');
fetcherTask['getCurrentConfigs'] = async () => {
throw mockError;
};
const getCurrentConfigs = jest.fn().mockRejectedValue(mockError);
const fetchTelemetry = jest.fn();
const sendTelemetry = jest.fn();
Object.assign(fetcherTask, {
getCurrentConfigs,
fetchTelemetry,
sendTelemetry,
});
const result = await fetcherTask['sendIfDue']();
expect(result).toBe(undefined);
expect(getCurrentConfigs).toBeCalledTimes(1);
expect(fetchTelemetry).toBeCalledTimes(0);
expect(sendTelemetry).toBeCalledTimes(0);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error fetching telemetry configs: ${mockError}`
`Error getting telemetry configs. (${mockError})`
);
});

it('stops when all collectors are not ready', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const getCurrentConfigs = jest.fn().mockResolvedValue({});
const areAllCollectorsReady = jest.fn().mockResolvedValue(false);
const shouldSendReport = jest.fn().mockReturnValue(true);
const fetchTelemetry = jest.fn();
const sendTelemetry = jest.fn();
const updateReportFailure = jest.fn();

Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
sendTelemetry,
});

await fetcherTask['sendIfDue']();

expect(fetchTelemetry).toBeCalledTimes(0);
expect(sendTelemetry).toBeCalledTimes(0);

expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(updateReportFailure).toBeCalledTimes(0);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error fetching usage. (Error: Not all collectors are ready.)`
);
});

it('fetches usage and send telemetry', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const mockTelemetryUrl = 'mock_telemetry_url';
const mockClusters = ['cluster_1', 'cluster_2'];
const getCurrentConfigs = jest.fn().mockResolvedValue({
telemetryUrl: mockTelemetryUrl,
});
const areAllCollectorsReady = jest.fn().mockResolvedValue(true);
const shouldSendReport = jest.fn().mockReturnValue(true);

const fetchTelemetry = jest.fn().mockResolvedValue(mockClusters);
const sendTelemetry = jest.fn();
const updateReportFailure = jest.fn();

Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
sendTelemetry,
});

await fetcherTask['sendIfDue']();

expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(fetchTelemetry).toBeCalledTimes(1);
expect(sendTelemetry).toBeCalledTimes(2);
expect(sendTelemetry).toHaveBeenNthCalledWith(1, mockTelemetryUrl, mockClusters[0]);
expect(sendTelemetry).toHaveBeenNthCalledWith(2, mockTelemetryUrl, mockClusters[1]);
expect(updateReportFailure).toBeCalledTimes(0);
});
});
});
30 changes: 25 additions & 5 deletions src/plugins/telemetry/server/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
// @ts-ignore
import fetch from 'node-fetch';
import { TelemetryCollectionManagerPluginStart } from 'src/plugins/telemetry_collection_manager/server';
import {
TelemetryCollectionManagerPluginStart,
UsageStatsPayload,
} from 'src/plugins/telemetry_collection_manager/server';
import {
PluginInitializerContext,
Logger,
Expand Down Expand Up @@ -94,6 +97,10 @@ export class FetcherTask {
}
}

private async areAllCollectorsReady() {
return (await this.telemetryCollectionManager?.areAllCollectorsReady()) ?? false;
}

private async sendIfDue() {
if (this.isSending) {
return;
Expand All @@ -103,17 +110,30 @@ export class FetcherTask {
try {
telemetryConfig = await this.getCurrentConfigs();
} catch (err) {
this.logger.warn(`Error fetching telemetry configs: ${err}`);
this.logger.warn(`Error getting telemetry configs. (${err})`);
return;
}

if (!telemetryConfig || !this.shouldSendReport(telemetryConfig)) {
return;
}

let clusters: Array<UsageStatsPayload | string> = [];
this.isSending = true;

try {
const allCollectorsReady = await this.areAllCollectorsReady();
if (!allCollectorsReady) {
throw new Error('Not all collectors are ready.');
}
clusters = await this.fetchTelemetry();
} catch (err) {
this.logger.warn(`Error fetching usage. (${err})`);
this.isSending = false;
return;
}

try {
this.isSending = true;
const clusters = await this.fetchTelemetry();
const { telemetryUrl } = telemetryConfig;
for (const cluster of clusters) {
await this.sendTelemetry(telemetryUrl, cluster);
Expand All @@ -123,7 +143,7 @@ export class FetcherTask {
} catch (err) {
await this.updateReportFailure(telemetryConfig);

this.logger.warn(`Error sending telemetry usage data: ${err}`);
this.logger.warn(`Error sending telemetry usage data. (${err})`);
}
this.isSending = false;
}
Expand Down
1 change: 1 addition & 0 deletions src/plugins/telemetry_collection_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ export {
ClusterDetails,
ClusterDetailsGetter,
LicenseGetter,
UsageStatsPayload,
} from './types';
6 changes: 6 additions & 0 deletions src/plugins/telemetry_collection_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export class TelemetryCollectionManagerPlugin
setCollection: this.setCollection.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand All @@ -75,6 +76,7 @@ export class TelemetryCollectionManagerPlugin
setCollection: this.setCollection.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand Down Expand Up @@ -185,6 +187,10 @@ export class TelemetryCollectionManagerPlugin
return [];
}

private areAllCollectorsReady = async () => {
return await this.usageCollection?.areAllCollectorsReady();
};

private getOptInStatsForCollection = async (
collection: Collection,
optInStatus: boolean,
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/telemetry_collection_manager/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface TelemetryCollectionManagerPluginSetup {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryCollectionManagerPluginStart {
Expand All @@ -42,6 +43,7 @@ export interface TelemetryCollectionManagerPluginStart {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryOptInStats {
Expand Down
29 changes: 17 additions & 12 deletions src/plugins/usage_collection/server/collector/collector_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,27 @@ export class CollectorSet {
};

public areAllCollectorsReady = async (collectorSet: CollectorSet = this) => {
// Kept this for runtime validation in JS code.
if (!(collectorSet instanceof CollectorSet)) {
throw new Error(
`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet
);
}

const collectorTypesNotReady = (
await Promise.all(
[...collectorSet.collectors.values()].map(async (collector) => {
if (!(await collector.isReady())) {
return collector.type;
}
})
)
).filter((collectorType): collectorType is string => !!collectorType);
const allReady = collectorTypesNotReady.length === 0;
const collectors = [...collectorSet.collectors.values()];
const collectorsWithStatus = await Promise.all(
collectors.map(async (collector) => {
return {
isReady: await collector.isReady(),
collector,
};
})
);

const collectorsTypesNotReady = collectorsWithStatus
.filter((collectorWithStatus) => collectorWithStatus.isReady === false)
.map((collectorWithStatus) => collectorWithStatus.collector.type);

const allReady = collectorsTypesNotReady.length === 0;

if (!allReady && this.maximumWaitTimeForAllCollectorsInS >= 0) {
const nowTimestamp = +new Date();
Expand All @@ -102,10 +106,11 @@ export class CollectorSet {
const timeLeftInMS = this.maximumWaitTimeForAllCollectorsInS * 1000 - timeWaitedInMS;
if (timeLeftInMS <= 0) {
this.logger.debug(
`All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) ` +
`All collectors are not ready (waiting for ${collectorsTypesNotReady.join(',')}) ` +
`but we have waited the required ` +
`${this.maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`
);

return true;
} else {
this.logger.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);
Expand Down

0 comments on commit f960e89

Please sign in to comment.