-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathindex.js
78 lines (60 loc) · 1.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"use strict";
var util = require('util'),
winston = require('winston'),
_ = require('lodash'),
kafka = require('kafka-node');
var Producer = kafka.Producer,
client,
producer;
var _isConnected = false;
var KafkaLogger = function (options) {
this.name = options.name || 'KafkaLogger';
this.level = options.level || 'info';
this.meta = options.meta || {};
/*
KAFKA Options
*/
// Zookeeper connection string, default localhost:2181/kafka0.8
this.connectionString = options.connectionString || 'localhost:2181';
// This is a user supplied identifier for the client application, default kafka-node-client
this.clientId = options.clientId;
// Object, Zookeeper options, see node-zookeeper-client
this.zkOptions = options.zkOptions;
this.topic = options.topic;
// Connect
client = new kafka.Client(this.connectionString, this.clientId, this.zkOptions);
producer = new Producer(client);
producer.on('ready', function () {
_isConnected = true;
});
producer.on('error', function (err) {
_isConnected = false;
var msg = 'winston-kafka-logger - Cannot connect to kafka server';
// throw new Error(msg);
console.error(msg, err);
});
};
util.inherits(KafkaLogger, winston.Transport);
KafkaLogger.prototype.log = function (level, msg, meta, callback) {
if (_isConnected) {
var payload = {
msg: msg,
level: level,
meta: _.defaults(meta, this.meta),
timestamp: new Date()
};
var payloads = [
{ topic: this.topic, messages: [JSON.stringify(payload)] }
];
try {
producer.send(payloads, function() {
// Ignore
});
}
catch(err) {
console.error('Failed to send log to kafka!!');
}
}
callback(null, true);
};
module.exports = KafkaLogger;