Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multiple FEWS ID support #7

Merged
merged 5 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions waterbalans/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .timeseries import get_series, update_series
from .utils import *
from .water import Water
from.run_utils import run_eag_by_name, get_dataframes_by_name, get_dataframes_from_files
88 changes: 88 additions & 0 deletions waterbalans/run_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os

import pandas as pd

from .create import create_eag
from .utils import create_csvfile_table, add_timeseries_to_obj


def run_eag_by_name(name, csvdir="./data/input_csv", tmin="1996", tmax="2019"):
file_df = create_csvfile_table(csvdir)
fbuckets, fparams, freeks, fseries, _, _ = file_df.loc[name]

dfdict = get_dataframes_from_files(fbuckets=fbuckets, fparams=fparams, freeks=freeks,
fseries=fseries, csvdir=csvdir)

deelgebieden = dfdict["deelgebieden"]
tijdreeksen = dfdict["tijdreeksen"]
parameters = dfdict["parameters"]
series = dfdict["series"]

# %% Simulation settings based on parameters
# ------------------------------------------
if parameters.loc[parameters.ParamCode == "hTargetMin", "Waarde"].iloc[0] != -9999.:
use_wl = True
else:
use_wl = False

# %% Model
# --------
# Maak bakjes model
e = create_eag(name.split("-")[-1], name, deelgebieden,
use_waterlevel_series=use_wl,
logfile="waterbalans.log")

# Voeg tijdreeksen toe
e.add_series_from_database(tijdreeksen, tmin=tmin, tmax=tmax)

# Voeg overige tijdreeksen toe (overschrijf FEWS met Excel)
if series is not None:
add_timeseries_to_obj(e, series, overwrite=True)

# Force MengRiool to use external timeseries
mengriool = e.get_buckets(buckettype="MengRiool")
if len(mengriool) > 0:
for b in mengriool:
b.use_eag_cso_series = False
b.path_to_cso_series = r"./data/cso_series/240_cso_timeseries.pklz"

# Simuleer waterbalans met parameters
e.simulate(parameters, tmin=tmin, tmax=tmax)

return e


def get_dataframes_from_files(fbuckets=None, freeks=None, fparams=None,
fseries=None, csvdir="./data/input_csv"):
dflist = {}
if fbuckets is not None:
# bestand met deelgebieden en oppervlaktes:
deelgebieden = pd.read_csv(
os.path.join(csvdir, fbuckets), delimiter=";")
dflist["deelgebieden"] = deelgebieden
if freeks is not None:
# bestand met tijdreeksen, b.v. neerslag/verdamping:
tijdreeksen = pd.read_csv(os.path.join(csvdir, freeks), delimiter=";")
dflist["tijdreeksen"] = tijdreeksen
if fparams is not None:
# bestand met parameters per deelgebied
parameters = pd.read_csv(os.path.join(csvdir, fparams), delimiter=";")
dflist["parameters"] = parameters
if fseries is not None:
# bestand met overige tijdreeksen
if not isinstance(fseries, float):
series = pd.read_csv(os.path.join(csvdir, fseries),
delimiter=";", index_col=[0], parse_dates=True)
else:
series = None
dflist["series"] = series
return dflist


def get_dataframes_by_name(name, csvdir="./data/input_csv"):
file_df = create_csvfile_table(csvdir)
fbuckets, fparams, freeks, fseries, _, _ = file_df.loc[name]

dfdict = get_dataframes_from_files(fbuckets=fbuckets, fparams=fparams, freeks=freeks,
fseries=fseries, csvdir=csvdir)
return dfdict
155 changes: 106 additions & 49 deletions waterbalans/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
import numpy as np
from hkvfewspy import Pi
from pandas import DataFrame, Series, Timedelta, Timestamp, date_range
from pandas import DataFrame, Series, Timedelta, Timestamp, date_range, concat


def initialize_fews_pi(wsdl='http://localhost:8080/FewsWebServices/fewspiservice?wsdl'):
Expand Down Expand Up @@ -73,60 +73,85 @@ def get_series(name, kind, data, tmin=None, tmax=None, freq="D", loggername=None
# Download a timeseries from FEWS
if kind == "FEWS" and pi is not None: # pragma: no cover

# Note: this selects only the first entry if there are multiple
if isinstance(data, DataFrame):
data = data.loc[:, "WaardeAlfa"]
if len(data) > 1:
logger.warning("Multiple series found, selecting "
"first one ({}) and continuing".format(data.iloc[0]))
data = data.iloc[0]
if data.shape[0] > 1:
fews_waarde_alfa = "||".join(data["WaardeAlfa"])
else:
data = data.loc["WaardeAlfa"].values[0]
fews_waarde_alfa = data["WaardeAlfa"].iloc[0]
# split if multiple fews ids provided in one string:
fewsid_list = fews_waarde_alfa.split("||")
fews_series = []
for fewsid in fewsid_list:
# parse fewsid
try:
filterId, moduleInstanceId, locationId, parameterId = fewsid.split(
"|")
except ValueError as e:
logger.error(
"Cannot parse FEWS Id for timeseries '{0}'! Id is {1}.".format(name, fewsid))
continue

# TODO: clean this try/except stuff up
try:
_, moduleInstanceId, locationId, parameterId = data.split(
"|") # new FEWS Code
except ValueError as e:
# get data from FEWS
try:
moduleInstanceId, locationId, parameterId = data.split("|")
df = _get_fews_series(filterId=filterId, moduleInstanceId=moduleInstanceId,
parameterId=parameterId, locationId=locationId, tmin=tmin,
tmax=tmax + Timedelta(days=1), pi=pi)
except Exception as e:
logger.error(
"Cannot parse FEWS Id for timeseries '{}'!".format(name))
return
logger.error("FEWS Timeseries '{}': {}".format(name, e))
continue

df.reset_index(inplace=True)
series = df.loc[:, ["date", "value",
"parameterId"]].set_index("date")
# Remove timezone from FEWS series
series = series.tz_localize(None)
series["value"] = series["value"].astype(float)

# omdat neerslag tussen 1jan 9u en 2jan 9u op 1jan gezet moet worden.
if np.any(series.index.hour != 9) or np.any(series.index.hour != 8):
series.index = series.index.floor(freq="D") - Timedelta(days=1)
series = series.squeeze()

query = pi.setQueryParameters(prefill_defaults=True)
query.query["onlyManualEdits"] = False
query.moduleInstanceIds([moduleInstanceId])
query.locationIds([locationId])
query.parameterIds([parameterId])
query.startTime(tmin)
query.endTime(tmax + Timedelta(days=1)) # add 1 day for prec/evap
# necessary for precip data after 2016-11-30...
query.useDisplayUnits(False)
query.clientTimeZone('Europe/Amsterdam')
# Delete nan-values (-999) (could be moved to fewspy)
series.replace(-999.0, np.nan, inplace=True)

try:
df = pi.getTimeSeries(query, setFormat='df')
except Exception as e:
logger.error("FEWS Timeseries '{}': {}".format(name, e))
return
df.reset_index(inplace=True)
series = df.loc[:, ["date", "value"]].set_index("date")
series = series.tz_localize(None) # Remove timezone from FEWS series
series = series.astype(float)

# omdat neerslag tussen 1jan 9u en 2jan 9u op 1jan gezet moet worden.
if np.any(series.index.hour != 9):
series.index = series.index.floor(freq="D") - Timedelta(days=1)
series = series.squeeze()
# check units, TODO, check if others need to be fixed?
if name in ["Verdamping", "Neerslag"]:
series["value"] = series["value"].divide(1e3)

# Delete nan-values (-999) (could be moved to fewspy)
series.replace(-999.0, np.nan, inplace=True)
fews_series.append(series)
logger.info("Adding FEWS timeseries '{}': {}.".format(
name, fewsid))

# check units, TODO, check if others need to be fixed?
if name in ["Verdamping", "Neerslag"]:
series = series.divide(1e3)
# Logic to combine multiple FEWS series
if len(fews_series) > 1:
params = [i["parameterId"].iloc[0] for i in fews_series]
# check if all params are equal
if not np.all([ip == params[0] for ip in params]):
logger.error(
"Not all FEWSIDs have the same parameter! {}".format(params))
return
# water levels: mean
elif params[0] == "H.meting.gem":
series = concat([s.value for s in fews_series], axis=1)
series = series.mean(axis=1)
logger.info(
"Combined multiple FEWS Series with method 'mean'.")
# pump volumes: sum
elif params[0] == "Vol.berekend.dag":
series = concat([s.value for s in fews_series], axis=1)
series = series.sum(axis=1)
logger.info("Combined multiple FEWS Series with method 'sum'.")
else:
logger.error(
"No logic defined for combining FEWS series with parameter '{}'!".format(params[0]))
raise NotImplementedError()
# only one fews series
elif len(fews_series) == 1:
series = fews_series[0]["value"]
# no fews series obtained
else:
logger.error("No FEWS series returned.")
return

# if KNMI data is required:
elif kind == "KNMI":
Expand Down Expand Up @@ -181,8 +206,8 @@ def get_series(name, kind, data, tmin=None, tmax=None, freq="D", loggername=None

else:
# TODO: fix logging, commented out now, because too much noise.
logger.warning(
"Adding series '{0}' of kind '{1}' not supported.".format(name, kind))
# logger.warning(
# "Adding series '{0}' of kind '{1}' not supported.".format(name, kind))
return

series.name = name
Expand Down Expand Up @@ -263,3 +288,35 @@ def update_series(series_orig, series_new, method="append"):
raise NotImplementedError("Method {} not implemented!".format(method))

return updated_series


def _get_fews_series(filterId=None, moduleInstanceId=None,
locationId=None, parameterId=None, tmin=None,
tmax=None, pi=None):

if pi is None:
pi = initialize_fews_pi()

query = pi.setQueryParameters(prefill_defaults=True)
query.moduleInstanceIds([moduleInstanceId])
query.parameterIds([parameterId])
query.locationIds([locationId])
query.useDisplayUnits(False) # needed for precip after 2016-11-30
query.startTime(tmin)
query.endTime(tmax)
query.version("1.24")

df = pi.getTimeSeries(query, setFormat='df')

return df


def get_fews_series(fewsid_string, tmin="1996", tmax="2019"):
pi = initialize_fews_pi()
filterId, moduleInstanceId, locationId, parameterId = fewsid_string.split(
"|")
df = _get_fews_series(filterId=filterId, moduleInstanceId=moduleInstanceId,
locationId=locationId, parameterId=parameterId,
tmin=tmin, tmax=tmax, pi=pi)

return df
15 changes: 15 additions & 0 deletions waterbalans/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,18 @@ def create_csvfile_table(csvdir):
file_df.dropna(how="any", subset=[
"opp", "param", "reeks"], axis=0, inplace=True)
return file_df


def compare_to_excel_balance(e, pickle_dir="./data/excel_pklz", **kwargs):
# Read Excel Balance Data (see scrape_excelbalansen.py for details)
excelbalance = pd.read_pickle(os.path.join(pickle_dir, "{}_wbalance.pklz".format(e.name)),
compression="zip")
for icol in excelbalance.columns:
excelbalance.loc[:, icol] = pd.to_numeric(
excelbalance[icol], errors="coerce")

# Waterbalance comparison
fig = e.plot.compare_fluxes_to_excel_balance(
excelbalance, **kwargs)

return fig