-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathS3Poller.js
149 lines (131 loc) · 4.24 KB
/
S3Poller.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
var AWS = require('aws-sdk');
var moment = require('moment');
var Lazy=require('lazy');
var logger = require('winston');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
function S3Poller (options) {
EventEmitter.call(this);
// Store s3_params
if (!(options.s3Params && options.s3Params.Bucket && options.s3Params.Prefix)) {
throw Error('s3Params not properly defined. Bucket & Prefix are mandatory.');
}
this._s3Params = options.s3Params;
if (options.from) {
this._from = moment(options.from);
} else {
this._from = moment().subtract(1, 'hours');
}
}
util.inherits(S3Poller, EventEmitter);
S3Poller.prototype.poll = function (state) {
if (state) {
this._last = state;
}else{
this._last = {};
}
var s3 = new AWS.S3();
var self = this;
// List of S3 files to read
var filesBuffer = [];
var lastFile = null; // We keep track of the last file
var interruptPagination = false;
var readNextFile = function () {
var nextFile = filesBuffer.shift();
logger.debug('S3Poller.readNextFile filesBuffer size', filesBuffer.length, 'nextFile', !nextFile || (nextFile.Key +' Size ' + nextFile.Size));
if (nextFile == null) { // Need to lookup the next files
if (interruptPagination) {
self.emit('end', self._last);
return;
}
var listParams = {
Bucket: self._s3Params.Bucket,
Prefix: self._s3Params.Prefix,
};
if (self._last && self._last.Key) {
listParams.Marker = self._last.Key.substring(0,self._last.Key.length-1);
} else if (self._s3Params.Marker) {
listParams.Marker = self._s3Params.Marker;
}
//List objects
logger.info('### S3 listParams', listParams);
s3.listObjects(listParams, function (err, data) {
if (err) {
self.emit('error', err, self._last);
} else {
//End of pagination
if (!data.Contents || data.Contents.length === 0){
self.emit('end', self._last);
} else {
if (data.Contents.length < data.MaxKeys) {
interruptPagination = true;
}
filesBuffer = filesBuffer.concat(data.Contents);
logger.info('filesBuffer size', filesBuffer.length);
readNextFile();
}
}
});
} else {
handleS3Content(nextFile, readNextFile);
}
};
var handleS3Content = function (content, callback) {
logger.debug('### Handle S3 Content', content);
// Create s3 params for getObject method
var objectParams = {
Bucket: self._s3Params.Bucket,Key:
content.Key
};
// Handle last cursors
if ( self._last &&
self._last.Key === content.Key) {
if (self._last.Size > content.Size) {
objectParams.Range = 'bytes=' + self.last.size + '-';
} else {
// Do not read the file if not bigger...
logger.debug('File ', content.Key , ' did not changed since last polling cycle...');
callback();
return;
}
}
// Keep last file state
self._last.Key = content.Key;
self._last.Size = content.Size;
// Check "from" condition
var lastModified = moment(content.LastModified);
if (self._from > lastModified) {
logger.debug('File', content.Key, ' not recent enough and under the `from` limit', self._from.toString());
callback();
return;
}
// Call AWS
logger.debug('### S3 getObject', objectParams);
var rstream = s3.getObject(objectParams).createReadStream();
rstream.on('error', function (err) {
logger.error('Error while calling getObject on AWS S3', err);
callback();
});
rstream.on('end', function () {
logger.debug('END object', content.Key);
callback();
});
// Emit lines
new Lazy(rstream).lines.forEach(function (line) {
// Build json object
var jsonObject = {
s3bucket: self._s3Params.Bucket,
s3key: this.Key,
message: line.toString()
};
// Emit the event
self.emit('data',jsonObject);
}.bind({
Key: content.Key
}));
};
// Start
self.emit('start');
readNextFile();
};
module.exports = S3Poller;