Skip to content

Commit

Permalink
v0.4.3
Browse files Browse the repository at this point in the history
  • Loading branch information
kaburia committed Oct 3, 2023
1 parent 7e54503 commit e8dfa89
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 24 deletions.
110 changes: 87 additions & 23 deletions filter_stations/filter_stations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import statsmodels.api as sm
from matplotlib.dates import DateFormatter
from tqdm.auto import tqdm
import multiprocessing as mp

import warnings
warnings. filterwarnings('ignore')
Expand Down Expand Up @@ -440,6 +441,13 @@ def get_measurements(self, station, startDate=None, endDate=None, variables=None
return df

# retrieve data from multiple at a time
def retrieve_data(self, station, startDate, endDate, variables, dataset, aggregate):
try:
data = self.get_measurements(station, startDate, endDate, variables, dataset, aggregate)
return data
except Exception as e:
return station, str(e)

def multiple_measurements(self, stations_list, csv_file, startDate, endDate, variables, dataset='controlled', aggregate=True):
"""
Retrieves measurements for multiple stations and saves the aggregated data to a CSV file.
Expand All @@ -461,33 +469,31 @@ def multiple_measurements(self, stations_list, csv_file, startDate, endDate, var
ValueError: If stations_list is not a list.
"""
error_dict = dict()
if isinstance(stations_list, list):
df_stats = []

for station in tqdm(stations_list,
desc='Retrieving data for stations',
total=len(stations_list),
leave=True):
# print(stations_list.index(station)+1,'/',len(stations_list))
# print(f'Retrieving data for station: {station}')
try:
data = self.get_measurements(station, startDate, endDate, variables, dataset, aggregate)
# agg_data = self.aggregate_variables(data)
df_stats.append(data)
except Exception as e:
error_dict[station] = f'{e}'

with open("Errors.json", "w") as outfile:
json.dump(error_dict, outfile, indent=4)

if not isinstance(stations_list, list):
raise ValueError('Pass in a list')

error_dict = {}
pool = mp.Pool(processes=mp.cpu_count()) # Use all available CPU cores

try:
results = []
with tqdm(total=len(stations_list), desc='Retrieving data for stations') as pbar:
for station in stations_list:
results.append(pool.apply_async(self.retrieve_data, args=(station, startDate, endDate, variables, dataset, aggregate), callback=lambda _: pbar.update(1)))

pool.close()
pool.join()

df_stats = [result.get() for result in results if isinstance(result.get(), pd.DataFrame)]

if len(df_stats) > 0:
df = pd.concat(df_stats, axis=1)
df.to_csv(f'{csv_file}.csv')
return df

else:
raise ValueError('Pass in a list')
except Exception as e:
print(f"An error occurred: {e}")
finally:
pool.terminate()

# multiple quality flags for multiple stations
def multiple_qualityflags(self, stations_list, startDate, endDate, csv_file=None):
Expand Down Expand Up @@ -532,7 +538,65 @@ def multiple_qualityflags(self, stations_list, startDate, endDate, csv_file=None
df = pd.concat(df_stats, axis=1)
df.to_csv(f'{csv_file}.csv')
return df.reindex(sorted(df.columns),axis=1) #sorted dataframe

# get the anomalies data report
def anomalies_report(self, start_date, end_date=None):
"""
Retrieves anomaly reports for a specified date range.
Parameters:
-----------
- start_date (str): The start date for the report in 'yyyy-mm-dd' format.
- end_date (str, optional): The end date for the report in 'yyyy-mm-dd' format.
If not provided, only data for the start_date is returned.
Returns:
-----------
- pandas.DataFrame: A DataFrame containing anomaly reports with columns 'startDate',
'station_sensor', and 'level'. The 'startDate' column is used as the index.
Raises:
-----------
- Exception: If there's an issue with the API request.
Usage:
-----------
To retrieve anomaly reports for a specific date range:
```
start_date = '2023-01-01'
end_date = '2023-01-31'
report_data = your_instance.anomalies_report(start_date, end_date)
```
To retrieve anomaly reports for a specific date:
```
start_date = '2023-01-01'
report_data = your_instance.anomalies_report(start_date)
```
"""
reqUrl = "https://datahub.tahmo.org/custom/sensordx/reports" # endpoint
# response = self.__request(reqUrl, {})
print(f'API request: {reqUrl}')
apiRequest = requests.get(f'{reqUrl}',
params={},
auth=requests.auth.HTTPBasicAuth(
self.apiKey,
self.apiSecret
)
)
if apiRequest.status_code == 200:
anomalies_data = pd.DataFrame(apiRequest.json()['qualityObjects'])
level_2 = anomalies_data[(anomalies_data.level == 2) & (anomalies_data.type == 'sensordx')]
level_2['station_sensor'] = level_2['stationCode'] + '_' + level_2['sensorCode']
level_2 = level_2[['startDate', 'station_sensor', 'level']]
level_2.startDate = pd.to_datetime([dateutil.parser.parse(i).strftime('%Y-%m-%d') for i in level_2['startDate']])
level_2.set_index('startDate', inplace=True)
if end_date:
return level_2.loc[start_date:end_date]
else:
return level_2.loc[start_date]
else:
return self.__handleApiError(apiRequest)
'''
A specific class to evaluate and validate the water level data using TAHMO Stations
To be used as it is to maintain flow
Expand Down
2 changes: 1 addition & 1 deletion filter_stations/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='filter_stations',
version='0.4.2',
version='0.4.3',
packages=find_packages(),
include_package_data=True,
description='Making it easier to navigate and clean station data',
Expand Down

0 comments on commit e8dfa89

Please sign in to comment.