Skip to content

Commit

Permalink
Fix performance issue of temporal matching, add new matching func
Browse files Browse the repository at this point in the history
  • Loading branch information
sebhahn committed Jul 15, 2019
1 parent d4233cd commit 3f3bc86
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 25 deletions.
134 changes: 122 additions & 12 deletions pytesmo/temporal_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,103 @@
"""

import numpy as np
import pandas as pd
from scipy.spatial import cKDTree

import pandas as pd

def df_temp_merge(df_reference, df_other, return_index=False,
return_distance=False, tolerance=None, direction='nearest',
duplicate_nan=False):
"""
Merge nearest neighbor between reference and other time series into
common dataframe.
Parameters
----------
df_reference : pandas.DataFrame
Reference time series.
df_other : tuple/list of pandas.DataFrame or pandas.DataFrame
Time series matched against reference time series.
return_index : bool, optional
Include index of other time series in matched dataframe
(default: False).
return_distance : bool, optional
Include distance information between reference and other time series
in matched dataframe (default: False).
tolerance : pd.Timedelta, optional
Select nearest neighbor tolerance (default: None).
direction : str, optional
Whether to search 'backward', 'forward', or 'nearest' matches
(default: 'nearest').
duplicate_nan : bool, optional
Set duplicate NaN (default: False).
Returns
-------
df_tm : pandas.DataFrame
Reference time series matched with other time series.
"""
if not isinstance(df_other, tuple):
df_other = (df_other, )

if isinstance(df_reference, pd.Series):
if df_reference.name is None:
name = 'reference'
else:
name = df_reference.name

df_reference = df_reference.to_frame(name)

for i, other in enumerate(df_other):

if isinstance(other, pd.Series):
if other.name is None:
name = 'series_{}'.format(i)
else:
name = other.name

other = other.to_frame(name)

dist_str = 'dist_other_{}'.format(i)
ind_str = 'ind_other_{}'.format(i)
other[ind_str] = other.index
col_other = other.columns

df = pd.merge_asof(df_reference, other, left_index=True,
right_index=True, direction=direction,
tolerance=tolerance)

df[dist_str] = df[ind_str] - df.index
dist_df = df[dist_str].values / np.timedelta64(1, 'D')

if duplicate_nan:
unq, unq_idx = np.unique(df[ind_str].values, return_index=True)
unq_idx = np.concatenate([unq_idx, np.array([len(df)])])

no_dup = []
for j in np.arange(unq_idx.size-1):
m = np.argmin(np.abs(
dist_df[unq_idx[j]:unq_idx[j+1]])) + unq_idx[j]
no_dup.append(m)

duplicates = np.ones(len(df), dtype=np.bool)
duplicates[no_dup] = False
df.loc[duplicates, col_other] = np.nan

fields = []

if not return_index:
fields.append(ind_str)

if not return_distance:
fields.append(dist_str)

if fields:
df.drop(fields, axis=1, inplace=True)

df_tm = df

return df_tm


def df_match(reference, *args, **kwds):
Expand All @@ -30,8 +124,10 @@ def df_match(reference, *args, **kwds):
dropduplicates : boolean
Drop duplicated temporal matched (default: False)
asym_window: string, optional
``<=`` stands for using a smaller and equal only for the left/smaller side of the window comparison
``>=`` stands for using a larger and equal only for the right/larger side of the window comparison
``<=`` stands for using a smaller and equal only for the left/smaller
side of the window comparison
``>=`` stands for using a larger and equal only for the right/larger
side of the window comparison
The default is to use <= and >= for both sides of the search window
Returns
Expand Down Expand Up @@ -93,14 +189,18 @@ def df_match(reference, *args, **kwds):
if asym_window == "<=":
# this means that only distance in the interval [distance[ are
# taken
valid_dist = ((arg_matched['distance'] >= 0.0) & (arg_matched['distance'] <= window)) | (
(arg_matched['distance'] <= 0.0) & (arg_matched['distance'] > -window))
valid_dist = ((arg_matched['distance'] >= 0.0) & (
arg_matched['distance'] <= window)) | (
(arg_matched['distance'] <= 0.0) & (
arg_matched['distance'] > -window))
invalid_dist = ~valid_dist
if asym_window == ">=":
# this means that only distance in the interval ]distance] are
# taken
valid_dist = ((arg_matched['distance'] >= 0.0) & (arg_matched['distance'] < window)) | (
(arg_matched['distance'] <= 0.0) & (arg_matched['distance'] >= -window))
valid_dist = ((arg_matched['distance'] >= 0.0) & (
arg_matched['distance'] < window)) | (
(arg_matched['distance'] <= 0.0) & (
arg_matched['distance'] >= -window))
invalid_dist = ~valid_dist
arg_matched.loc[invalid_dist] = np.nan

Expand All @@ -109,9 +209,19 @@ def df_match(reference, *args, **kwds):

if "dropduplicates" in kwds and kwds['dropduplicates']:
arg_matched = arg_matched.dropna(how='all')
g = arg_matched.groupby('merge_key')
min_dists = g.distance.apply(lambda x: x.abs().idxmin())
arg_matched = arg_matched.loc[min_dists]

unq, unq_idx = np.unique(arg_matched['index'].values,
return_index=True)
unq_idx = np.concatenate([unq_idx, np.array([len(arg_matched)])])

dist = arg_matched['distance'].values
no_dup = []
for j in np.arange(unq_idx.size-1):
m = np.argmin(np.abs(
dist[unq_idx[j]:unq_idx[j+1]])) + unq_idx[j]
no_dup.append(m)

arg_matched = arg_matched.iloc[no_dup]

temporal_matched_args.append(
arg_matched.drop(['merge_key', 'ref_index'], axis=1))
Expand Down Expand Up @@ -141,8 +251,8 @@ def matching(reference, *args, **kwargs):
Returns
-------
temporal_match : pandas.DataFrame
containing the index of the reference Series and a column for each of the
other input Series
containing the index of the reference Series and a column for each
of the other input Series
"""
matched_datasets = df_match(reference, *args, dropna=True,
dropduplicates=True, **kwargs)
Expand Down
75 changes: 62 additions & 13 deletions tests/test_temporal_matching.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright (c) 2015,Vienna University of Technology,
# Department of Geodesy and Geoinformation
# Copyright (c) 2019, TU Wien, Department of Geodesy and Geoinformation
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -45,34 +44,47 @@ def test_df_match_borders():
See issue #51
"""
ref_df = pd.DataFrame(
{"data": np.arange(5)}, index=pd.date_range(
datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))

ref_df = pd.DataFrame({"data": np.arange(5)}, index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))
match_df = pd.DataFrame({"matched_data": np.arange(5)},
index=[datetime(2007, 1, 1, 9),
datetime(2007, 1, 2, 9),
datetime(2007, 1, 3, 9),
datetime(2007, 1, 4, 9),
datetime(2007, 1, 5, 9)])

matched = tmatching.df_match(ref_df, match_df)

nptest.assert_allclose(
np.array([0.375, 0.375, 0.375, 0.375, 0.375]), matched.distance.values)
nptest.assert_allclose(np.arange(5), matched.matched_data)

matched = tmatching.df_temp_merge(ref_df, match_df, return_distance=True)
dist = matched['dist_other_0'].values / np.timedelta64(1, 'D')

nptest.assert_allclose(
np.array([0.375, 0.375, 0.375, 0.375, 0.375]), dist)
nptest.assert_allclose(np.arange(5), matched.matched_data)


def test_df_match_match_on_window_border():
"""
test matching if a value lies exactly on the window border.
"""

ref_df = pd.DataFrame({"data": np.arange(5)}, index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))
ref_df = pd.DataFrame(
{"data": np.arange(5)}, index=pd.date_range(
datetime(2007, 1, 1, 0), "2007-01-05", freq="D"))

match_df = pd.DataFrame({"matched_data": np.arange(4)},
index=[datetime(2007, 1, 1, 9),
datetime(2007, 1, 2, 9),
datetime(2007, 1, 3, 12),
datetime(2007, 1, 5, 9)])

matched = tmatching.df_match(ref_df, match_df, window=0.5)

nptest.assert_allclose(
Expand Down Expand Up @@ -102,29 +114,40 @@ def test_df_match_borders_unequal_query_points():
See issue #51
"""

ref_df = pd.DataFrame({"data": np.arange(5)}, index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))
ref_df = pd.DataFrame({"data": np.arange(5)},
index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))

match_df = pd.DataFrame({"matched_data": np.arange(4)},
index=[datetime(2007, 1, 1, 9),
datetime(2007, 1, 2, 9),
datetime(2007, 1, 4, 9),
datetime(2007, 1, 5, 9)])

matched = tmatching.df_match(ref_df, match_df)

nptest.assert_allclose(
np.array([0.375, 0.375, -0.625, 0.375, 0.375]), matched.distance.values)
nptest.assert_allclose(np.array([0, 1, 1, 2, 3]), matched.matched_data)

matched = tmatching.df_temp_merge(ref_df, match_df, return_distance=True)
dist = matched['dist_other_0'].values / np.timedelta64(1, 'D')

nptest.assert_allclose(
np.array([0.375, 0.375, -0.625, 0.375, 0.375]), dist)
nptest.assert_allclose(np.array([0, 1, 1, 2, 3]), matched.matched_data)


def test_matching():
"""
test matching function
"""
data = np.arange(5.0)
data = np.arange(10.0)
data[3] = np.nan

ref_df = pd.DataFrame({"data": data}, index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))
ref_df = pd.DataFrame({"data": data}, index=pd.date_range(
datetime(2007, 1, 1, 0), "2007-01-10", freq="D"))

match_df = pd.DataFrame({"matched_data": np.arange(5)},
index=[datetime(2007, 1, 1, 9),
datetime(2007, 1, 2, 9),
Expand All @@ -136,16 +159,29 @@ def test_matching():
nptest.assert_allclose(np.array([0, 1, 2, 4]), matched.matched_data)
assert len(matched) == 4

matched = tmatching.df_temp_merge(ref_df, match_df)

nptest.assert_allclose(np.array([0, 1, 2, 3, 4, 4, 4, 4, 4, 4]),
matched.matched_data)
assert len(matched) == 10

matched = tmatching.df_temp_merge(ref_df, match_df, duplicate_nan=True)

nptest.assert_allclose(np.array([0, 1, 2, 3, 4, np.nan, np.nan,
np.nan, np.nan, np.nan]),
matched.matched_data)
assert len(matched) == 10


def test_matching_series():
"""
test matching function with pd.Series as input
"""
data = np.arange(5.0)
data = np.arange(10.)
data[3] = np.nan

ref_ser = pd.Series(data, index=pd.date_range(datetime(2007, 1, 1, 0),
"2007-01-05", freq="D"))
"2007-01-10", freq="D"))
match_ser = pd.Series(np.arange(5),
index=[datetime(2007, 1, 1, 9),
datetime(2007, 1, 2, 9),
Expand All @@ -158,3 +194,16 @@ def test_matching_series():

nptest.assert_allclose(np.array([0, 1, 2, 4]), matched.matched_data)
assert len(matched) == 4

matched = tmatching.df_temp_merge(ref_ser, match_ser)

nptest.assert_allclose(np.array([0, 1, 2, 3, 4, 4, 4, 4, 4, 4]),
matched.matched_data)
assert len(matched) == 10

matched = tmatching.df_temp_merge(ref_ser, match_ser, duplicate_nan=True)

nptest.assert_allclose(np.array([0, 1, 2, 3, 4, np.nan, np.nan,
np.nan, np.nan, np.nan]),
matched.matched_data)
assert len(matched) == 10

0 comments on commit 3f3bc86

Please sign in to comment.