Skip to content

Commit

Permalink
Merge pull request #21 from statful/create-plugins-mechanism
Browse files Browse the repository at this point in the history
Create plugins mechanism
  • Loading branch information
bcamarneiro authored Oct 25, 2018
2 parents 85e08cb + 60ac29f commit 38d8894
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 233 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ Read the methods options reference bellow to get more information about the defa
| **_tags_** (`object`) - Defines the tags of the metric. These tags are merged with the ones configured globally, including method defaults. | `{}` | `{}` | `{ unit: 'ms' }` | `{}` | **YES** |
| **_timestamp_** (`string`) - Defines the timestamp of the metric. This timestamp is a **POSIX/Epoch** time in **seconds**. | `current timestamp` | `current timestamp` | `current timestamp` | `current timestamp` | **YES** |

## Plugins
It is possible to use plugin with the client.
```javascript
var SystemStatsPlugin = require('./plugins/system-stats.js');
var statful = new Statful(config, log);
statful.use(new SystemStatsPlugin());
```
### System Stats Plugin
This plugin allows the client to send system-related metrics and/or enrich the user metrics with system tags.

#### System Stats Plugin Configuration

The custom options that can be set on config param are detailed below.

| Option | Description | Type | Default | Required |
|:---|:---|:---|:---|:---|
| _bufferFlushLength_ | Defines the application global name. If specified sets a global tag `app=setValue`. | `metric` | true | **NO** |
| _timerEventLoop_ | Object to set methods options. | `metric` | true | **NO** |
| _processUptime_ | Uptime of the process in **miliseconds**. | `metric` | true | **NO** |
| _processMemoryUsage_ | Process memory usage in **bytes**. | `metric` | true | **NO** |
| _processMemoryUsagePerc_ | Process memory usage **percentage**. (compared to total OS memory) | `metric` | true | **NO** |
| _osUptime_ | OS uptime in **miliseconds**. | `metric` | true | **NO** |
| _osTotalMemory_ | OS total memory in **bytes**. | `metric` | true | **NO** |
| _osFreeMemory_ | OS free memory in **bytes**. | `metric` | true | **NO** |
| _tagHostname_ | Hostname. | `tag` | true | **NO** |
| _tagPlatform_ | Platform. | `tag` | true | **NO** |
| _tagArchitecture_ | Architecture. | `tag` | true | **NO** |
| _tagNodeVersion_ | NodeJS Version | `tag` | true | **NO** |

## Authors

[Mindera - Software Craft](https://github.com/Mindera)
Expand Down
221 changes: 52 additions & 169 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

var dgram = require('dgram');
var blocked = require('blocked');

var merge = require('merge');
var configHelper = require('./config-helper');
var transport = require('./transport');
Expand Down Expand Up @@ -54,60 +54,6 @@ function putMetric (self, metricTypeConf, name, value, aggregation, aggregationF
}
}

/**
* Puts a system stats metric into the system stats buffer ready to be sent.
*
* @param self A self statful client.
* @param metricTypeConf A configuration for each metric type (counter, gauge, timer). Can be null if it a custom metric.
* @param name A metric name.
* @param value A metric value.
* @param aggregation The aggregation with which metric was aggregated.
* @param aggregationFreq The aggregation frequency with which metric was aggregated.
* @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp.
*/
function putSystemStatsMetrics (self, name, value, parameters) {
var metricParams = parameters || {};
var tags = metricParams.tags,
agg = metricParams.agg,
aggFreq = metricParams.aggFreq,
namespace = metricParams.namespace,
timestamp = metricParams.timestamp,
sampleRate = metricParams.sampleRate;

putSystemStats(self, name, value, {
tags: tags,
agg: agg,
aggFreq: aggFreq,
namespace: namespace,
timestamp: timestamp,
sampleRate: sampleRate
});
}

function sendFlushStats (self) {
if (self.systemStats) {
var aggregations = ['avg', 'sum'];
if (self.aggregatedBuffer.bufferSize > 0 && self.transport === 'api') {
putSystemStatsMetrics(self, 'buffer.flush_length', self.aggregatedBuffer.bufferSize, {
agg: aggregations,
tags: { buffer_type: 'aggregated' }
});
}
if (self.nonAggregatedBuffer.bufferSize > 0) {
putSystemStatsMetrics(self, 'buffer.flush_length', self.nonAggregatedBuffer.bufferSize, {
agg: aggregations,
tags: { buffer_type: 'non-aggregated' }
});
}
if (self.systemStatsBuffer.bufferSize > 0) {
putSystemStatsMetrics(self, 'buffer.flush_length', self.systemStatsBuffer.bufferSize, {
agg: aggregations,
tags: { buffer_type: 'system-stats' }
});
}
}
}

/**
* Logs all the metrics to the logger
*
Expand Down Expand Up @@ -138,11 +84,21 @@ function logMetrics (self) {
}
self.logger.debug(stringToLogHeader + stringToLog);
}
if (self.systemStatsBuffer.bufferSize > 0) {
self.logger.debug('Flushing metrics (system stats): ' + self.systemStatsBuffer.buffer);
if (pluginsBuffersSize(self.pluginBuffers) > 0) {
self.logger.debug('Flushing plugins metrics');
}
}

function pluginsBuffersSize (buffers) {
var size = 0;

for (var i in buffers) {
size += buffers[i].bufferSize;
}

return size;
}

/**
* Sends the non aggregated and system stats metrics using UDP transport
*
Expand All @@ -164,8 +120,8 @@ function sendMetricsByUdpTransport (self) {
self.socket.send(buffer, 0, buffer.length, self.port, self.host);
}

if (self.systemStatsBuffer.bufferSize > 0) {
buffer = new Buffer(self.systemStatsBuffer.buffer);
for (var i in self.pluginBuffers) {
buffer = new Buffer(self.pluginBuffers[i].buffer);
self.socket.send(buffer, 0, buffer.length, self.port, self.host);
}
}
Expand Down Expand Up @@ -233,22 +189,25 @@ function sendMetricsByApiTransport (self) {
}
}

if (self.systemStatsBuffer.bufferSize > 0) {
var nonAggregatedStatsOptions = transport.buildRequestOptions(
self.protocol,
self.host,
self.port,
self.basePath,
self.token,
self.timeout
);

self.logger.debug('Flushing to ' + nonAggregatedStatsOptions.url + ' system stats metrics');

if (self.compression) {
transport.sendCompressedMessage(nonAggregatedStatsOptions, self.systemStatsBuffer.buffer, self.logger);
} else {
transport.sendUncompressedMessage(nonAggregatedStatsOptions, self.systemStatsBuffer.buffer, self.logger);
for (var i in self.pluginBuffers) {
var element = self.pluginBuffers[i];
if(element.bufferSize > 0) {
var nonAggregatedStatsOptions = transport.buildRequestOptions(
self.protocol,
self.host,
self.port,
self.basePath,
self.token,
self.timeout
);

self.logger.debug('Flushing to ' + nonAggregatedStatsOptions.url + ' system stats metrics');

if (self.compression) {
transport.sendCompressedMessage(nonAggregatedStatsOptions, element.buffer, self.logger);
} else {
transport.sendUncompressedMessage(nonAggregatedStatsOptions, element.buffer, self.logger);
}
}
}
}
Expand All @@ -259,9 +218,12 @@ function sendMetricsByApiTransport (self) {
* @param self A self client instance.
*/
function flush (self) {
sendFlushStats(self);
for (var index = 0; index < self.plugins.length; index++) {
self.plugins[index].onFlush(self);
}

var metricsCounter =
self.aggregatedBuffer.bufferSize + self.nonAggregatedBuffer.bufferSize + self.systemStatsBuffer.bufferSize;
self.aggregatedBuffer.bufferSize + self.nonAggregatedBuffer.bufferSize + pluginsBuffersSize(self.pluginBuffers);

if (metricsCounter > 0) {
if (self.dryRun) {
Expand All @@ -280,8 +242,11 @@ function flush (self) {
self.aggregatedBuffer.bufferSize = 0;
self.nonAggregatedBuffer.buffer = '';
self.nonAggregatedBuffer.bufferSize = 0;
self.systemStatsBuffer.buffer = '';
self.systemStatsBuffer.bufferSize = 0;

for (var i in self.pluginBuffers) {
self.pluginBuffers[i].buffer = '';
self.pluginBuffers[i].bufferSize = 0;
}
}
}

Expand Down Expand Up @@ -317,7 +282,7 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) {
if (
self.aggregatedBuffer.bufferSize +
self.nonAggregatedBuffer.bufferSize +
self.systemStatsBuffer.bufferSize >=
pluginsBuffersSize(self.pluginBuffers) >=
self.flushSize
) {
setTimeout(function () {
Expand All @@ -329,27 +294,6 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) {
}
}

/**
* Adds raw metrics directly into the flush buffer. Use this method with caution.
*
* @param self A self client instance.
* @param metricLines The metrics, in valid line protocol, to push to the buffer.
*/
function addToStatsBuffer (self, metricLines) {
if (typeof metricLines !== 'undefined') {
var targetBuffer = self.systemStatsBuffer;

if (targetBuffer.bufferSize > 0) {
targetBuffer.buffer += '\n';
}

targetBuffer.buffer += metricLines;
targetBuffer.bufferSize++;
} else {
self.logger.error('addToStatsBuffer: Invalid metric lines: ' + metricLines);
}
}

/**
* Adds a new metric to the in-memory buffer.
*
Expand Down Expand Up @@ -409,60 +353,6 @@ function putRaw (self, metric, value, parameters, isMetricAggregated) {
}
}

/**
* Adds a new system stats metric to the in-memory system stats buffer.
*
* @param self A self client instance.
* @param metric Name metric such as 'response_time'.
* @param value.
* @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp.
* - tags: Tags to associate this value with, for example {from: 'serviceA', to: 'serviceB', method: 'login'}.
* - agg: List of aggregations to be applied by Statful. Ex: ['avg', 'p90', 'min'].
* - aggFreq: Aggregation frequency in seconds. One of: 10, 30, 60 ,120, 180, 300. Default: 10.
* - namespace: Define the metric namespace. Default: application.
* - timestamp: Defines the metrics timestamp. Default: current timestamp.
*/
function putSystemStats (self, metric, value, parameters) {
var metricParams = parameters || {};

var tags = metricParams.tags,
agg = metricParams.agg,
aggFreq = metricParams.aggFreq,
namespace = metricParams.namespace,
timestamp = metricParams.timestamp,
sampleRate = parameters.sampleRate || self.sampleRate;

// Vars to Put
var putNamespace = namespace || self.namespace;
var putAggFreq = aggFreq || 10;
var putTags = merge(self.app ? merge({ app: self.app }, tags) : tags, self.tags);

var metricName = putNamespace + '.' + metric,
flushLine = metricName,
sampleRateNormalized = (sampleRate || 100) / 100;

if (Math.random() <= sampleRateNormalized) {
flushLine = Object.keys(putTags).reduce(function (previousValue, tag) {
return previousValue + ',' + tag + '=' + putTags[tag];
}, flushLine);

flushLine += ' ' + value + ' ' + (timestamp || Math.round(new Date().getTime() / 1000));

if (agg) {
agg.push(putAggFreq);
flushLine += ' ' + agg.join(',');

if (sampleRate && sampleRate < 100) {
flushLine += ' ' + sampleRate;
}
}

addToStatsBuffer(self, [flushLine]);
} else {
self.logger.debug('Metric was discarded due to sample rate.');
}
}

/**
* Calls put metric with an aggregated metric.
*
Expand Down Expand Up @@ -532,7 +422,6 @@ var Client = function (config, logger) {
this.namespace = config.namespace || 'application';
this.dryRun = config.dryRun;
this.tags = config.tags || {};
this.systemStats = config.systemStats !== undefined ? config.systemStats : true;
this.sampleRate = config.sampleRate || 100;
this.flushInterval = config.flushInterval || 3000;
this.flushSize = config.flushSize || 1000;
Expand All @@ -559,19 +448,8 @@ var Client = function (config, logger) {
bufferSize: 0
};

this.systemStatsBuffer = {
buffer: '',
bufferSize: 0
};

if (this.systemStats) {
blocked(function (ms) {
putSystemStatsMetrics(self, 'timer.event_loop', ms, {
agg: ['avg', 'p90', 'count'],
tags: { unit: 'ms' }
});
});
}
this.plugins = [];
this.pluginBuffers = {};

setInterval(
function (obj) {
Expand Down Expand Up @@ -739,4 +617,9 @@ Client.prototype.timer = function (name, value, parameters) {
putNonAggregatedMetric(this, this.default.timer, 'timer.' + name, value, parameters);
};

Client.prototype.use = function (plugin) {
plugin.onInit(this);
this.plugins.push(plugin);
};

module.exports = Client;
Loading

0 comments on commit 38d8894

Please sign in to comment.