-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathreporter.py
303 lines (241 loc) · 9.18 KB
/
reporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""
The reporter is the service responsible for handling NGSI notifications,
validating them, and feeding the corresponding updates to the translator.
The reporter needs to know the form of the entity (i.e, name and types of its
attributes). There are two approaches:
1 - Clients tell reporter which entities they care about and Reporter goes
find the metadata in Context Broker
2 - The reporter only consumes the Context Broker notifications and builds
little by little the whole entity.
In this case, the notifications must come with some mimimum amount of
required data (e.g, entity_type, entity_id, a time index and the
updated value[s]). Ideally, in the first notification the reporter
would be notified of all the entity attributes so that it can tell the
translator how to create the complete corresponding table[s] in the
database.
For now, we have adopted approach 2.
TODO:
- Validate entity and attribute names against valid NGSI names and valid
[Crate names](https://crate.io/docs/crate/reference/en/latest/sql/ddl/basics.html#naming-restrictions)
- Raise warning and act accordingly when receiving entity with equal lowercased
attributes.
- Consider offering an API endpoint to receive just the user's entities of
interest and make QL actually perform the corresponding subscription to orion.
I.e, QL must be told where orion is.
"""
from flask import request
from geocoding.geocache import GeoCodingCache
from requests import RequestException
from translators.crate import CrateTranslator, CrateTranslatorInstance, \
NGSI_TO_CRATE, NGSI_TEXT, NGSI_DATETIME, NGSI_ISO8601
from utils.common import iter_entity_attrs
import json
import logging
import os
import requests
from reporter.subscription_builder import build_subscription
from reporter.timex import select_time_index_value_as_iso
from geocoding.location import normalize_location
def log():
logging.basicConfig(level=logging.INFO)
return logging.getLogger(__name__)
def is_text(attr_type):
return attr_type == NGSI_TEXT or attr_type not in NGSI_TO_CRATE
# TODO: same logic in two different places!
# The above kinda reproduces the tests done by the translator, we should
# factor this logic out and keep it in just one place!
def has_value(entity, attr_name):
attr = entity.get(attr_name, {})
if attr is None:
attr = {}
attr_value = attr.get('value', None)
attr_type = attr.get('type', None)
if attr_value is None:
return False
if is_text(attr_type):
return True
if isinstance(attr_value, str):
attr_value = attr_value.strip()
# If type != Text and value == '', make value = null
return attr_value != ''
def _validate_payload(payload):
"""
:param payload:
The received json data in the notification.
:return: str | None
Error message, if any.
Note that some attributes are actually verified by Connexion framework (
e.g type and id). We leave the checks as double-checking.
"""
# The entity must be uniquely identifiable
if 'type' not in payload:
return 'Entity type is required in notifications'
if 'id' not in payload:
return 'Entity id is required in notifications'
# There should be at least one attribute other than id and type
# (i.e, the changed value)
attrs = list(iter_entity_attrs(payload))
if len(attrs) == 0:
log().warning("Received notification without attributes " +
"other than 'type' and 'id'")
# Attributes should have a value and the modification time
for attr in attrs:
if not has_value(payload, attr):
payload[attr].update({'value': None})
log().warning(
'Payload is missing value for attribute {}'.format(attr))
def notify():
if request.json is None:
return 'Discarding notification due to lack of request body. ' \
'Lost in a redirect maybe?', 400
if 'data' not in request.json:
return 'Discarding notification due to lack of request body ' \
'content.', 400
payload = request.json['data']
if len(payload) > 1:
return 'Multiple data elements in notifications not supported yet', 400
payload = payload[0]
log().info('Received payload: {}'.format(payload))
# Validate notification
error = _validate_payload(payload)
if error:
return error, 400
# Add TIME_INDEX attribute
payload[CrateTranslator.TIME_INDEX_NAME] = \
select_time_index_value_as_iso(request.headers, payload)
# Add GEO-DATE if enabled
add_geodata(payload)
# Always normalize location if there's one
normalize_location(payload)
# Define FIWARE tenant
fiware_s = request.headers.get('fiware-service', None)
# It seems orion always sends a 'Fiware-Servicepath' header with value '/'
# But this is not correctly documented in the API, so in order not to
# depend on this, QL will not treat servicepath if there's no service
# specified.
if fiware_s:
fiware_sp = request.headers.get('fiware-servicepath', None)
else:
fiware_sp = None
# Send valid entities to translator
with CrateTranslatorInstance() as trans:
trans.insert([payload], fiware_s, fiware_sp)
msg = 'Notification successfully processed'
log().info(msg)
return msg
def add_geodata(entity):
# TODO: Move this setting to configuration (See GH issue #10)
use_geocoding = os.environ.get('USE_GEOCODING', False)
redis_host = os.environ.get('REDIS_HOST', None)
# No cache -> no geocoding by default
if use_geocoding and redis_host:
redis_port = os.environ.get('REDIS_PORT', 6379)
cache = GeoCodingCache(redis_host, redis_port)
from geocoding import geocoding
geocoding.add_location(entity, cache=cache)
def query_1TNENA():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def query_1TNENA_value():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def query_NTNE1A():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def query_NTNE1A_value():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def query_NTNENA():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def query_NTNENA_value():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def config():
r = {
"error": "Not Implemented",
"description": "This API method is not yet implemented."
}
return r, 501
def subscribe(orion_url,
quantumleap_url,
entity_type=None,
entity_id=None,
id_pattern=None,
attributes=None,
observed_attributes=None,
notified_attributes=None,
throttling=None,
time_index_attribute=None):
# Validate Orion
try:
r = requests.get(orion_url)
except RequestException:
r = None
if r is None or not r.ok:
msg = {
"error": "Bad Request",
"description": "Orion is not reachable by QuantumLeap at {}"
.format(orion_url)
}
return msg, 400
# Prepare subscription
subscription = build_subscription(
quantumleap_url,
entity_type, entity_id, id_pattern,
attributes, observed_attributes, notified_attributes,
throttling, time_index_attribute)
# Send subscription
endpoint = '{}/subscriptions'.format(orion_url)
data = json.dumps(subscription)
headers = {'Content-Type': 'application/json'}
fiware_s = request.headers.get('fiware-service', None)
if fiware_s:
headers['fiware-service'] = fiware_s
fiware_sp = request.headers.get('fiware-servicepath', None)
if fiware_sp:
headers['fiware-servicepath'] = fiware_sp
r = requests.post(endpoint, data=data, headers=headers)
if not r.ok:
log().debug("subscribing to {} with headers: {} and data: {}")
return r.text, r.status_code
def _validate_query_params(attr_names, aggr_period, aggr_method,
aggr_scope=None, options=None):
if aggr_period and not aggr_method:
r = {
"error": "Bad parameters use",
"description": "aggrMethod is compulsory when using aggrPeriod."
}
return r, 400
if options or aggr_scope not in (None, 'entity'):
r = {
"error": "Not implemented option",
"description": "aggrScope and options are not yet implemented."
}
return r, 501
if aggr_method and not attr_names:
msg = "Specified aggrMethod = {} but missing attrs parameter."
r = {
"error": "Bad parameters use",
"description": msg.format(aggr_method)
}
return r, 400
return "OK", 200