Skip to content

Commit

Permalink
Merge pull request pandas-dev#3 in DATA/arctic from MDP-448_tickstore…
Browse files Browse the repository at this point in the history
…_timezone_inconsistency to master

* commit 'fc8eafdf5528a6fe52d1f471a9cdf9454e81f6e6':
  MDP-448 to_dt should respect default_tz for ms since epoch datetimes         Add unit tests
  MDP-448 Make ms_to_datetime always return a non-naive datetime.datetime
  MDP-448 remove debug print
  MDP-448 Fix arctic_copy_data. If the VersionStore data has no TimeZone then don't slice the the original_data using a timezone during --spliceing
  MDP-448 to_pandas_closed_closed now does an implicit to_dt
  MDP-448 Ensure we use non-naive datetimes for the Mongo query in the read-path
  MDP-448 Ensure we set a TimeZone on the returned DataFrame on tickstore.read. We store time as ms since epoch, and this will prevent confusion on interpretation with naive DateTimes on read.
  • Loading branch information
jamesblackburn committed Aug 19, 2015
2 parents c7a5c8d + fc8eafd commit 0a4c3c1
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 62 deletions.
2 changes: 1 addition & 1 deletion arctic/date/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ._daterange import DateRange
from ._generalslice import OPEN_CLOSED, CLOSED_OPEN, OPEN_OPEN, CLOSED_CLOSED
from ._util import datetime_to_ms, ms_to_datetime
from ._util import string_to_daterange, to_pandas_closed_closed
from ._util import string_to_daterange, to_pandas_closed_closed, to_dt
from ._mktz import mktz, TimezoneError
36 changes: 34 additions & 2 deletions arctic/date/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,34 @@ def string_to_daterange(str_range, delimiter='-', as_dates=False, interval=CLOSE
return DateRange(d[0], d[1], oc)


def to_dt(date, default_tz=None):
"""
Returns a non-naive datetime.datetime.
Interprets numbers as ms-since-epoch.
Parameters
----------
date : `int` or `datetime.datetime`
The datetime to convert
default_tz : tzinfo
The TimeZone to use if none is found. If not supplied, and the
datetime doesn't have a timezone, then we raise ValueError
Returns
-------
Non-naive datetime
"""
if isinstance(date, (int, long)):
return ms_to_datetime(date, default_tz)
elif date.tzinfo is None:
if default_tz is None:
raise ValueError("Must specify a TimeZone on incoming data")
return date.replace(tzinfo=default_tz)
return date


def to_pandas_closed_closed(date_range):
"""
Pandas DateRange slicing is CLOSED-CLOSED inclusive at both ends.
Expand All @@ -86,12 +114,16 @@ def to_pandas_closed_closed(date_range):
"""
if not date_range:
return None

start = date_range.start
end = date_range.end
if start:
start = to_dt(start, mktz()) # Ensure they have timezones
if date_range.startopen:
start += timedelta(milliseconds=1)

if end:
end = to_dt(end, mktz()) # Ensure they have timezones
if date_range.endopen:
end -= timedelta(milliseconds=1)
return DateRange(start, end)
Expand All @@ -102,8 +134,8 @@ def ms_to_datetime(ms, tzinfo=None):
if not isinstance(ms, (int, long)):
raise TypeError('expected integer, not %s' % type(ms))

if tzinfo in (None, mktz()):
return datetime.datetime.fromtimestamp(ms * 1e-3, mktz()).replace(tzinfo=None)
if tzinfo is None:
tzinfo = mktz()

return datetime.datetime.fromtimestamp(ms * 1e-3, tzinfo)

Expand Down
19 changes: 12 additions & 7 deletions arctic/scripts/arctic_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from arctic.store.audit import ArcticTransaction

from ..hosts import get_arctic_lib
from ..date import DateRange, to_pandas_closed_closed, CLOSED_OPEN, OPEN_CLOSED
from ..date import DateRange, to_pandas_closed_closed, CLOSED_OPEN, OPEN_CLOSED, mktz
from .utils import setup_logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -38,12 +38,17 @@ def _copy_symbol(symbols):

if existing_data and splice:
original_data = dest.read(symbol).data
before = original_data.ix[:to_pandas_closed_closed(DateRange(None,
new_data.index[0].to_pydatetime(),
interval=CLOSED_OPEN)).end]
after = original_data.ix[to_pandas_closed_closed(DateRange(new_data.index[-1].to_pydatetime(),
None,
interval=OPEN_CLOSED)).start:]
preserve_start = to_pandas_closed_closed(DateRange(None, new_data.index[0].to_pydatetime(),
interval=CLOSED_OPEN)).end
preserve_end = to_pandas_closed_closed(DateRange(new_data.index[-1].to_pydatetime(),
None,
interval=OPEN_CLOSED)).start
if not original_data.index.tz:
# No timezone on the original, should we even allow this?
preserve_start = preserve_start.replace(tzinfo=None)
preserve_end = preserve_end.replace(tzinfo=None)
before = original_data.ix[:preserve_start]
after = original_data.ix[preserve_end:]
new_data = before.append(new_data).append(after)

mt.write(symbol, new_data, metadata=version.metadata)
Expand Down
2 changes: 1 addition & 1 deletion arctic/store/version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def list_versions(self, symbol=None, snapshot=None, latest_only=False):
continue
seen_symbols.add(version['symbol'])
versions.append({'symbol': version['symbol'], 'version': version['version'],
# We return naive datetimes in London Time.
# We return naive datetimes in Local Time.
'date': ms_to_datetime(datetime_to_ms(version['_id'].generation_time)),
'snapshots': self._find_snapshots(version.get('parent', []))})
return versions
Expand Down
39 changes: 16 additions & 23 deletions arctic/tickstore/tickstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import pymongo
from pymongo.errors import OperationFailure

from ..date import DateRange, to_pandas_closed_closed, mktz, datetime_to_ms, ms_to_datetime
from ..date import DateRange, to_pandas_closed_closed, mktz, datetime_to_ms, CLOSED_CLOSED, to_dt
from ..decorators import mongo_retry
from ..exceptions import OverlappingDataException, NoDataFoundException, UnhandledDtypeException, ArcticException
from .._util import indent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -127,12 +126,8 @@ def delete(self, symbol, date_range=None):
date_range = to_pandas_closed_closed(date_range)
if date_range is not None:
assert date_range.start and date_range.end
if date_range.start:
start = self._to_dt(date_range.start)
if date_range.end:
end = self._to_dt(date_range.end)
query[START] = {'$gte': start}
query[END] = {'$lte': end}
query[START] = {'$gte': date_range.start}
query[END] = {'$lte': date_range.end}
self._collection.delete_many(query)

def list_symbols(self, date_range=None):
Expand All @@ -143,10 +138,14 @@ def _mongo_date_range_query(self, symbol, date_range):
if not date_range:
date_range = DateRange()

# We're assuming CLOSED_CLOSED on these Mongo queries
assert date_range.interval == CLOSED_CLOSED

# Find the start bound
start_range = {}
first = last = None
if date_range.start:
assert date_range.start.tzinfo
start = date_range.start
startq = self._symbol_query(symbol)
startq.update({START: {'$lte': start}})
Expand All @@ -159,6 +158,7 @@ def _mongo_date_range_query(self, symbol, date_range):

# Find the end bound
if date_range.end:
assert date_range.end.tzinfo
end = date_range.end
endq = self._symbol_query(symbol)
endq.update({START: {'$gt': end}})
Expand Down Expand Up @@ -258,7 +258,7 @@ def read(self, symbol, date_range=None, columns=None, include_images=False, _tar
raise NoDataFoundException("No Data found for {} in range: {}".format(symbol, date_range))
rtn = self._pad_and_fix_dtypes(rtn, column_dtypes)

index = pd.to_datetime(np.concatenate(rtn[INDEX]), unit='ms')
index = pd.to_datetime(np.concatenate(rtn[INDEX]), utc=True, unit='ms')
if columns is None:
columns = [x for x in rtn.keys() if x not in (INDEX, 'SYMBOL')]
if multiple_symbols and 'SYMBOL' not in columns:
Expand All @@ -278,6 +278,8 @@ def read(self, symbol, date_range=None, columns=None, include_images=False, _tar
logger.info("Got data in %s secs, creating DataFrame..." % t)
mgr = _arrays_to_mgr(arrays, columns, index, columns, dtype=None)
rtn = pd.DataFrame(mgr)
# Present data in the user's default TimeZone
rtn.index.tz = mktz()

t = (dt.now() - perf_start).total_seconds()
ticks = len(rtn)
Expand Down Expand Up @@ -465,7 +467,7 @@ def write(self, symbol, data):
pandas = True
else:
raise UnhandledDtypeException("Can't persist type %s to tickstore" % type(data))
self._assert_nonoverlapping_data(symbol, self._to_dt(start), self._to_dt(end))
self._assert_nonoverlapping_data(symbol, to_dt(start), to_dt(end))

if pandas:
buckets = self._pandas_to_buckets(data, symbol)
Expand Down Expand Up @@ -498,15 +500,6 @@ def _to_ms(self, date):
return datetime_to_ms(date)
return date

def _to_dt(self, date, default_tz=None):
if isinstance(date, (int, long)):
return ms_to_datetime(date, mktz('UTC'))
elif date.tzinfo is None:
if default_tz is None:
raise ValueError("Must specify a TimeZone on incoming data")
return date.replace(tzinfo=default_tz)
return date

def _str_dtype(self, dtype):
"""
Represent dtypes without byte order, as earlier Java tickstore code doesn't support explicit byte order.
Expand Down Expand Up @@ -540,8 +533,8 @@ def _ensure_supported_dtypes(self, array):
return array

def _pandas_to_bucket(self, df, symbol):
start = self._to_dt(df.index[0].to_datetime())
end = self._to_dt(df.index[0].to_datetime())
start = to_dt(df.index[0].to_datetime())
end = to_dt(df.index[0].to_datetime())
rtn = {START: start, END: end, SYMBOL: symbol}
rtn[VERSION] = CHUNK_VERSION_NUMBER
rtn[COUNT] = len(df)
Expand All @@ -566,8 +559,8 @@ def _pandas_to_bucket(self, df, symbol):
def _to_bucket(self, ticks, symbol):
data = {}
rowmask = {}
start = self._to_dt(ticks[0]['index'])
end = self._to_dt(ticks[-1]['index'])
start = to_dt(ticks[0]['index'])
end = to_dt(ticks[-1]['index'])
for i, t in enumerate(ticks):
for k, v in t.iteritems():
try:
Expand Down
70 changes: 53 additions & 17 deletions tests/integration/tickstore/test_ts_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from numpy.testing.utils import assert_array_equal
from pandas.util.testing import assert_frame_equal
import pandas as pd
from pandas.tseries.index import DatetimeIndex
import pytest
import pytz

Expand Down Expand Up @@ -123,11 +124,11 @@ def test_read_all_cols_all_dtypes(tickstore_lib, chunk_size):
# Treat missing strings as None
data[0]['ns'] = None
data[1]['os'] = None
# Strip TZ from the data for the moment
data[0]['index'] = dt(1970, 1, 1)
data[1]['index'] = dt(1970, 1, 1, 0, 0, 1)
expected = pd.DataFrame(data)
expected = expected.set_index('index')
index = DatetimeIndex([dt(1970, 1, 1, tzinfo=mktz('UTC')),
dt(1970, 1, 1, 0, 0, 1, tzinfo=mktz('UTC'))],
)
index.tz = mktz()
expected = pd.DataFrame(data, index=index)
expected = expected[df.columns]
assert_frame_equal(expected, df, check_names=False)

Expand Down Expand Up @@ -229,6 +230,41 @@ def test_date_range_end_not_in_range(tickstore_lib):
assert tickstore_lib._collection.find(f.call_args_list[-1][0][0]).count() == 1


@pytest.mark.parametrize('tz_name', ['UTC',
'Europe/London', # Sometimes ahead of UTC
'America/New_York', # Behind UTC
])
def test_date_range_default_timezone(tickstore_lib, tz_name):
"""
We assume naive datetimes are user-local
"""
DUMMY_DATA = [
{'a': 1.,
'b': 2.,
'index': dt(2013, 1, 1, tzinfo=mktz(tz_name))
},
# Half-way through the year
{'b': 3.,
'c': 4.,
'index': dt(2013, 7, 1, tzinfo=mktz(tz_name))
},
]

with patch('arctic.date._mktz.DEFAULT_TIME_ZONE_NAME', tz_name):
tickstore_lib.chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)
df = tickstore_lib.read('SYM', date_range=DateRange(20130101, 20130701), columns=None)
assert len(df) == 2
assert df.index[1] == dt(2013, 7, 1, tzinfo=mktz(tz_name))
assert df.index.tz == mktz(tz_name)

df = tickstore_lib.read('SYM', date_range=DateRange(20130101, 20130101), columns=None)
assert len(df) == 1

df = tickstore_lib.read('SYM', date_range=DateRange(20130701, 20130701), columns=None)
assert len(df) == 1


def test_date_range_no_bounds(tickstore_lib):
DUMMY_DATA = [
{'a': 1.,
Expand Down Expand Up @@ -387,31 +423,31 @@ def test_read_with_image(tickstore_lib):
assert_array_equal(df['a'].values, np.array([37, 1, np.nan]))
assert_array_equal(df['b'].values, np.array([np.nan, np.nan, 4]))
assert_array_equal(df['c'].values, np.array([2, np.nan, np.nan]))
assert df.index[0] == dt(2013, 1, 1, 10)
assert df.index[1] == dt(2013, 1, 1, 11)
assert df.index[2] == dt(2013, 1, 1, 12)
assert df.index[0] == dt(2013, 1, 1, 10, tzinfo=mktz('Europe/London'))
assert df.index[1] == dt(2013, 1, 1, 11, tzinfo=mktz('Europe/London'))
assert df.index[2] == dt(2013, 1, 1, 12, tzinfo=mktz('Europe/London'))

# Read just columns from the updates
df = tickstore_lib.read('SYM', columns=('a', 'b'), date_range=dr, include_images=True)
assert set(df.columns) == set(('a', 'b'))
assert_array_equal(df['a'].values, np.array([37, 1, np.nan]))
assert_array_equal(df['b'].values, np.array([np.nan, np.nan, 4]))
assert df.index[0] == dt(2013, 1, 1, 10)
assert df.index[1] == dt(2013, 1, 1, 11)
assert df.index[2] == dt(2013, 1, 1, 12)
assert df.index[0] == dt(2013, 1, 1, 10, tzinfo=mktz('Europe/London'))
assert df.index[1] == dt(2013, 1, 1, 11, tzinfo=mktz('Europe/London'))
assert df.index[2] == dt(2013, 1, 1, 12, tzinfo=mktz('Europe/London'))

# Read one column from the updates
df = tickstore_lib.read('SYM', columns=('a',), date_range=dr, include_images=True)
assert set(df.columns) == set(('a',))
assert_array_equal(df['a'].values, np.array([37, 1, np.nan]))
assert df.index[0] == dt(2013, 1, 1, 10)
assert df.index[1] == dt(2013, 1, 1, 11)
assert df.index[2] == dt(2013, 1, 1, 12)
assert df.index[0] == dt(2013, 1, 1, 10, tzinfo=mktz('Europe/London'))
assert df.index[1] == dt(2013, 1, 1, 11, tzinfo=mktz('Europe/London'))
assert df.index[2] == dt(2013, 1, 1, 12, tzinfo=mktz('Europe/London'))

# Read just the image column
df = tickstore_lib.read('SYM', columns=['c'], date_range=dr, include_images=True)
assert set(df.columns) == set(['c'])
assert_array_equal(df['c'].values, np.array([2, np.nan, np.nan]))
assert df.index[0] == dt(2013, 1, 1, 10)
assert df.index[1] == dt(2013, 1, 1, 11)
assert df.index[2] == dt(2013, 1, 1, 12)
assert df.index[0] == dt(2013, 1, 1, 10, tzinfo=mktz('Europe/London'))
assert df.index[1] == dt(2013, 1, 1, 11, tzinfo=mktz('Europe/London'))
assert df.index[2] == dt(2013, 1, 1, 12, tzinfo=mktz('Europe/London'))
8 changes: 5 additions & 3 deletions tests/integration/tickstore/test_ts_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
},
{'b': 9.,
'c': 10.,
'index': dt(2013, 1, 5, tzinfo=mktz('Europe/London'))
'index': dt(2013, 7, 5, tzinfo=mktz('Europe/London'))
},
]

Expand Down Expand Up @@ -69,9 +69,11 @@ def test_ts_write_pandas(tickstore_lib):
data = DUMMY_DATA
tickstore_lib.write('SYM', data)

data = tickstore_lib.read('SYM', columns=None).tz_localize(mktz('Europe/London'))
data = tickstore_lib.read('SYM', columns=None)
assert data.index[0] == dt(2013, 1, 1, tzinfo=mktz('Europe/London'))
assert data.a[0] == 1
tickstore_lib.delete('SYM')
tickstore_lib.write('SYM', data)

read = tickstore_lib.read('SYM', columns=None).tz_localize(mktz('Europe/London'))
read = tickstore_lib.read('SYM', columns=None)
assert_frame_equal(read, data, check_names=False)
9 changes: 4 additions & 5 deletions tests/unit/date/test_datetime_to_ms_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def assert_roundtrip(tz):

ts1 = ts.replace(tzinfo=tz)
ts2 = ms_to_datetime(datetime_to_ms(ts1.astimezone(mktz("UTC"))), tz)
ts1 = ts1.replace(tzinfo=None) if tz == mktz() else ts1
#logger.info(ts2.tzinfo)

assert(ts2.hour == ts1.hour)
Expand Down Expand Up @@ -53,22 +52,22 @@ def test_mktz_London():

def test_datetime_roundtrip_local_no_tz():
pdt = datetime.datetime(2012, 6, 12, 12, 12, 12, 123000)
pdt2 = ms_to_datetime(datetime_to_ms(pdt))
pdt2 = ms_to_datetime(datetime_to_ms(pdt)).replace(tzinfo=None)
assert pdt2 == pdt

pdt = datetime.datetime(2012, 1, 12, 12, 12, 12, 123000)
pdt2 = ms_to_datetime(datetime_to_ms(pdt))
pdt2 = ms_to_datetime(datetime_to_ms(pdt)).replace(tzinfo=None)
assert pdt2 == pdt


def test_datetime_roundtrip_local_tz():
pdt = datetime.datetime(2012, 6, 12, 12, 12, 12, 123000, tzinfo=mktz(DEFAULT_TIME_ZONE_NAME))
pdt2 = ms_to_datetime(datetime_to_ms(pdt))
assert pdt2 == pdt.replace(tzinfo=None)
assert pdt2 == pdt

pdt = datetime.datetime(2012, 1, 12, 12, 12, 12, 123000, tzinfo=mktz(DEFAULT_TIME_ZONE_NAME))
pdt2 = ms_to_datetime(datetime_to_ms(pdt))
assert pdt2 == pdt.replace(tzinfo=None)
assert pdt2 == pdt


def test_datetime_roundtrip_est_tz():
Expand Down
Loading

0 comments on commit 0a4c3c1

Please sign in to comment.