Skip to content

Commit

Permalink
env a22 nlc: Issue noi-techpark#27.
Browse files Browse the repository at this point in the history
- Refactored code to only download data that is actually needed (both stations and datatypes)
- Directly load CSV instead of requiring translation in JSON via tool
- Add new calibration coefficients
- Modify NO2 formula, maintain old formula for old records (new field in processorParameters.csv)
  • Loading branch information
clezag committed Jun 14, 2023
1 parent c015b67 commit dacb5e1
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 206 deletions.
5 changes: 5 additions & 0 deletions environment-a22-non-linear-calibration/calls.http
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Authorization: Bearer {{token}}
?limit=-1
&where=sactive.eq.true,sorigin.eq.a22-algorab,mperiod.eq.3600
&select=mvalidtime,scode

###
GET {{host}}/flat/EnvironmentStation/O3_raw,NO2-Alphasense_raw,NO-Alphasense_raw,PM10_raw,PM2.5_raw/2016-01-01T00:00:00+00:00/2023-06-14T12:00:03+00:00
?select=mvalidtime,mvalue,tname
&where=sactive.eq.true,sorigin.eq.a22-algorab,mperiod.eq.3600,scode.eq.AUGEG4_AIRQ13
&limit=-1

### Get history of single station
Expand Down
49 changes: 32 additions & 17 deletions environment-a22-non-linear-calibration/src/ODHAPIClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import logging
from ODHKeyCloakClient import KeycloakClient
from functools import reduce

log = logging.getLogger()
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
Expand All @@ -26,9 +27,18 @@ def fetch_data(self, endpoint):
else:
return r.json()

def get_newest_data_timestamps(self):
response = self.fetch_data(
"/tree/EnvironmentStation/*/latest?limit=-1&where=sactive.eq.true,sorigin.eq.a22-algorab,mperiod.eq.3600&select=mvalidtime,scode&limit=-1")
def get_newest_data_timestamps(self, stations, types):
# each type is requested with _raw and _processed postfix, e.g. "NO2" becomes "NO2_raw,NO2_processed"
types_to_fetch = ",".join([tp + postfix for tp in types for postfix in ["_raw", "_processed"]])
station_filter = ",scode.in.(" + ",".join(stations) + ")"
response = self.fetch_data(f"/tree/EnvironmentStation/{types_to_fetch}/latest"
"?select=mvalidtime,scode"
"&where=sactive.eq.true"
",sorigin.eq.a22-algorab"
",mperiod.eq.3600"
f"{station_filter}"
"&limit=-1"
)
if (response != None):
log.debug("fetched newest data for raw and processed data:" + str(response))
stations = response['data']['EnvironmentStation']['stations']
Expand All @@ -46,22 +56,27 @@ def get_newest_data_timestamps(self):
log.debug("Generated station map: " + str(station_map))
return station_map

def get_raw_history(self, station_id, start, end):
def get_raw_history(self, station_id, start, end, types):
raw_data_map = {}
data = self.fetch_data( "/flat/EnvironmentStation/*/"
+ str(start).replace(" ","T") + "/"+ str(end).replace(" ","T")
+ "?limit=-1&where=sactive.eq.true,sorigin.eq.a22-algorab,mperiod.eq.3600,scode.eq."
+ station_id + "&select=mvalidtime,mvalue,tname")['data']
if data != None:
types_str = ",".join(map(lambda x : x + "_raw", types))
start_str = str(start).replace(" ","T")
end_str = str(end).replace(" ","T")
data = self.fetch_data( f"/flat/EnvironmentStation/{types_str}/{start_str}/{end_str}"+
"?select=mvalidtime,mvalue,tname"
"&where=sactive.eq.true"
",sorigin.eq.a22-algorab"
",mperiod.eq.3600"
",scode.eq." + station_id +
"&limit=-1"
)['data']
if data:
log.debug("fetched history data:" + str(data))
for record in data:
type_arr = str(record['tname']).split("_")
if len(type_arr)>1 and type_arr[1] == "raw":
value = record['mvalue']
type_id = type_arr[0]
time = record['mvalidtime']
typeMap = {}
typeMap[type_id] = value
raw_data_map.setdefault(time,{}).update(typeMap)
value = record['mvalue']
type_id = str(record['tname']).split("_")[0]
time = record['mvalidtime']
typeMap = {}
typeMap[type_id] = value
raw_data_map.setdefault(time,{}).update(typeMap)
log.debug("Raw history: " + str(raw_data_map))
return raw_data_map
3 changes: 2 additions & 1 deletion environment-a22-non-linear-calibration/src/ODHPusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ class DataPusher:
def __init__(self):
self.provenance_id = None
self.token = KeycloakClient.getDefaultInstance().token("", "","client_credentials")
self.upsert_provenance()

def send_data(self,station_type, data_map):
if not self.provenance_id:
self.upsert_provenance()
data_map["provenance"]= self.provenance_id
endpoint = os.getenv("ODH_MOBILITY_API_WRITER")+"/json/pushRecords/" + station_type
log.debug("Data send to writer: " + str(data_map))
Expand Down
65 changes: 33 additions & 32 deletions environment-a22-non-linear-calibration/src/ParameterImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,37 @@

import csv
import json
f = open('processorParameters.csv',)
data = csv.reader(f, delimiter=',')
data_map = {}
type_mapping = {
"8":"O3",
"12":"PM10",
"13":"PM2.5",
"14":"NO2-Alphasense",
"15":"NO-Alphasense"
}
next(data, None)
for row in data:
type_id =type_mapping[row[1]]
type_map = data_map.get(row[0],{})
temp_map = type_map.get(type_id,{})
parameter_map = {
"a": row[5],
"b": row[6],
"c": row[7],
"d": row[8],
"e": row[9],
"f": row[10]

def getParameters():
f = open('processorParameters.csv',)
data = csv.reader(f, delimiter=',')
data_map = {}
type_mapping = {
"8":"O3",
"12":"PM10",
"13":"PM2.5",
"14":"NO2-Alphasense",
"15":"NO-Alphasense"
}
if len(str(row[2])) == 0:
temp_map['lowtemp']= parameter_map
else:
temp_map['hightemp']= parameter_map
type_map[type_id] = temp_map
data_map[row[0]] = type_map

d = open("data.json","a")
d.write(json.dumps(data_map))
d.close()
next(data, None)
for row in data:
station_id = 'AUGEG4_' + row[0]
type_id =type_mapping[row[1]]
type_map = data_map.get(station_id,{})
temp_map = type_map.get(type_id,{})
parameter_map = {
"a": row[5],
"b": row[6],
"c": row[7],
"d": row[8],
"e": row[9],
"f": row[10],
"calc_version": row[11]
}
if len(str(row[2])) == 0:
temp_map['lowtemp']= parameter_map
else:
temp_map['hightemp']= parameter_map
type_map[type_id] = temp_map
data_map[station_id] = type_map
return data_map
1 change: 0 additions & 1 deletion environment-a22-non-linear-calibration/src/data.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from model.Dtos import DataPoint
from ODHAPIClient import DataFetcher
from ODHPusher import DataPusher
from ParameterImporter import getParameters

import json
import numpy as np
Expand All @@ -20,8 +21,8 @@
PM25 = "PM2.5"
T_INT = "temperature-internal"
TYPES_TO_ELABORATE = [O3,NO2,NO,PM10,PM25]
F = open('data.json',)
PARAMETER_MAP = json.load(F)
PARAMETER_MAP = getParameters()
STATIONS_TO_ELABORATE = list(PARAMETER_MAP.keys())
log = logging.getLogger()

fetcher = DataFetcher()
Expand All @@ -32,21 +33,21 @@ def parseODHTime(time: str) -> datetime:

class Processor:
def calc_by_station(self):
time_map = fetcher.get_newest_data_timestamps()
start = parseODHTime(DEFAULT_START_CALC)
time_map = fetcher.get_newest_data_timestamps(stations=STATIONS_TO_ELABORATE, types=TYPES_TO_ELABORATE)
for s_id in time_map:
start = parseODHTime(DEFAULT_START_CALC)
end = start
for t_id in time_map[s_id]:
state_map = time_map[s_id][t_id]
end_time = parseODHTime(state_map.get('raw'))
start_time = parseODHTime(state_map.get('processed',DEFAULT_START_CALC))
if (start_time and start < start_time):
start = start_time
history = fetcher.get_raw_history(s_id,start,end_time+datetime.timedelta(0,3))
elaborations = self.calc(history,s_id)
pusher.send_data("EnvironmentStation",elaborations)
def calc(self, history,station_id):
station_map ={"branch":{ station_id:{"branch":{},"data":[],"name":"default"}}}
for time in history:
end = max(parseODHTime(state_map.get('raw')), end)
start = min(parseODHTime(state_map.get('processed', DEFAULT_START_CALC)), start)
history = fetcher.get_raw_history(s_id, start, end+datetime.timedelta(0, 3), types=TYPES_TO_ELABORATE)
if history:
elaborations = self.calc(history,s_id)
pusher.send_data("EnvironmentStation", elaborations)
def calc(self, history, station_id):
station_map = {"branch":{ station_id:{"branch":{},"data":[],"name":"default"}}}
for time in history:
elabs = self.process_single_dataset(history[time], station_id, time)
if elabs != None:
for type_id in elabs:
Expand All @@ -65,16 +66,24 @@ def process_single_dataset(self, data, station_id, time):
for type_id in (value for value in data if value in TYPES_TO_ELABORATE): #Intersection
value = data[type_id]
processed_value = None
station_id_short =str(station_id).split("_")[1]
parameters = PARAMETER_MAP[station_id_short][type_id][temparature_key]
parameters = PARAMETER_MAP[station_id][type_id][temparature_key]
if ((type_id == NO2 or type_id == NO) and O3 in data and T_INT in data):
processed_value = (
float(parameters["a"])
+ np.multiply(float(parameters["b"]), np.power(float(value),2))
+ np.multiply(float(parameters["c"]), float(value))
+ np.multiply(float(parameters["d"]), float(data[O3]))
+ np.multiply(float(parameters["e"]), np.power(data[T_INT],4))
)
if(int(parameters["calc"]) == 1):
processed_value = (
float(parameters["a"])
+ np.multiply(float(parameters["b"]), np.power(float(value),2))
+ np.multiply(float(parameters["c"]), float(value))
+ np.multiply(float(parameters["d"]), np.power(float(data[O3]), 0.1))
+ np.multiply(float(parameters["e"]), np.power(data[T_INT],4))
)
else:
processed_value = (
float(parameters["a"])
+ np.multiply(float(parameters["b"]), np.power(float(value),2))
+ np.multiply(float(parameters["c"]), float(value))
+ np.multiply(float(parameters["d"]), float(data[O3]))
+ np.multiply(float(parameters["e"]), np.power(data[T_INT],4))
)
elif ((type_id == PM10 or type_id == PM25) and all (tid in data for tid in (RH,PM10,T_INT))
and not (data[T_INT] >= 20 and data[PM10]>100) and not(data[T_INT] < 20 and data[RH]>97)):
processed_value = (
Expand All @@ -98,6 +107,6 @@ def process_single_dataset(self, data, station_id, time):
if processed_value != None:
if processed_value < 0:
processed_value = 0
data_point_map[type_id+"_processed"] = DataPoint(datetime.datetime.strptime(time,"%Y-%m-%d %H:%M:%S.%f%z").timestamp() * 1000,processed_value,3600)
data_point_map[type_id+"_processed"] = DataPoint(parseODHTime(time).timestamp() * 1000, processed_value, 3600)
processed_value = None
return data_point_map
Loading

0 comments on commit dacb5e1

Please sign in to comment.