-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
132 lines (123 loc) · 4.04 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
var elasticsearch = require('elasticsearch');
var step = require('step')
var client = new elasticsearch.Client({
host: <ip>+':9200'
});
var es_index = ""
var es_type = "iot_data"
function dateToYMD(date) {
var d = date.getDate();
var m = date.getMonth() + 1;
var y = date.getFullYear();
return '' + y + '-' + (m<=9 ? '0' + m : m) + '-' + (d <= 9 ? '0' + d : d);
}
exports.handler = function(event, context) {
// console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
var d = JSON.parse(payload);
var s = d['time'].toString();
var year = s.substring(0, 4);
var month = s.substring(4, 6);
var day = s.substring(6, 8);
var hour = s.substring(8, 10);
var minute = s.substring(10, 12);
var second = s.substring(12, 14);
var r = {
'device_id': d['device_id'],
'time': year+'-'+month+'-'+day+'T'+hour+':'+minute+':'+second+'Z',
'location': d['lat']+', '+d['lon'],
'uv': d['data'][0],
'light': d['data'][1],
'temp': d['data'][2],
'humidity': d['data'][3],
'sound': d['data'][4],
'dust': d['data'][5],
'pressure': d['data'][6],
'altitude': d['data'][7]
}
es_index = "iot-" + year + "." + month + "." + day
console.log('Decoded data');
console.log(r);
step(
function (){
client.indices.exists({
index: es_index
}, this)
},
function create_index_if_missing(err, stat){
if (err){
console.log(err);
context.done()
}
if (!stat){
console.log('Create index')
client.indices.create({
index: es_index
}, this)
}
else{
return true;
}
},
function create_mapping(err, stat){
if (err){
console.log(err);
context.done()
}
if (typeof(stat)=='object'){
console.log('add mapping');
var body = {
'iot_data':{
'properties':{
'device_id' : {"type" : "string", "index" : "not_analyzed"},
'time' : {"type" : "date", "format" : "yyyy-MM-dd'T'HH:mm:ss'Z'"},
'location' : {"type" : "geo_point"},
'uv' : {"type" : "float"},
'light' : {"type" : "float"},
'temp' : {"type" : "float"},
'humidity' : {"type" : "float"},
'sound' : {"type" : "float"},
'dust' : {"type" : "float"},
'pressure' : {"type" : "float"},
'altitude' : {"type" : "float"}
}
}
}
client.indices.putMapping({index:es_index, type:es_type, body:body}, this)
}
else{
return true;
}
},
function insert_data(err, stat) {
if (err){
console.log(err);
context.done()
}
client.create({
index: es_index,
type: es_type,
body: r
}, this)
},
function(err, rep){
if (err){
console.log(err);
}
/* reply shape
{
_index: 'iot_test_oreh',
_type: 'iot_data',
_id: 'AU1cwMPsrnJqSn-3ayB2',
_version: 1,
created: true
}
*/
// console.log(rep._index+" "+rep._type+" "+rep._id)
context.done();
}
);
});
};