-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdatabase.py
93 lines (62 loc) · 2.54 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import h5py
import logging
import time
from typing import *
import numpy as np
import pandas as pd
logger = logging.getLogger()
class Hdf5Client:
def __init__(self, exchange: str):
self.hf = h5py.File(f"data/{exchange}.h5", "a")
self.hf.flush()
def create_dataset(self, symbol: str):
if symbol not in self.hf.keys():
self.hf.create_dataset(
symbol, (0, 8), maxshape=(None, 8), dtype="float64")
self.hf.flush()
def write_data(self, symbol: str, data: List[Tuple]):
min_ts, max_ts = self.get_first_last_timestamp(symbol)
if min_ts is None:
min_ts = float("inf")
max_ts = 0
filtered_data = []
for d in data:
if d[0] < min_ts:
filtered_data.append(d)
elif d[0] > max_ts:
filtered_data.append(d)
if len(filtered_data) == 0:
logger.warning("%s: No data to insert", symbol)
return
data_array = np.array(filtered_data)
self.hf[symbol].resize(self.hf[symbol].shape[0] +
data_array.shape[0], axis=0)
self.hf[symbol][-data_array.shape[0]:] = data_array
self.hf.flush()
def get_data(self, symbol: str, from_time: int, to_time: int) -> Union[None, pd.DataFrame]:
start_query = time.time()
existing_data = self.hf[symbol][:]
if len(existing_data) == 0:
return None
data = sorted(existing_data, key=lambda x: x[0])
data = np.array(data)
df = pd.DataFrame(
data, columns=["timestamp", "open", "high", "low", "close", "volume", "bidPrice", "askPrice"])
df = df[((df["timestamp"]) >= from_time)
& (df["timestamp"] <= to_time)]
df["timestamp"] = pd.to_datetime(
df["timestamp"].values.astype(np.int64), unit="ms")
df.set_index("timestamp", drop=True, inplace=True)
query_time = round((time.time() - start_query), 2)
logger.info("Retrieved %s %s data from database in %s seconds",
len(df.index),
symbol,
query_time)
return df
def get_first_last_timestamp(self, symbol: str) -> Union[Tuple[None, None], Tuple[float, float]]:
existing_data = self.hf[symbol][:]
if len(existing_data) == 0:
return None, None
first_ts = min(existing_data, key=lambda x: x[0])[0]
last_ts = max(existing_data, key=lambda x: x[0])[0]
return first_ts, last_ts