-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtest.py
144 lines (111 loc) · 4.36 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os, io, npy_append_array, threading
import numpy as np
from itertools import product
from pathlib import Path
from npy_append_array import NpyAppendArray
tmpfile = Path('./tmp/tmp.npy')
tmpfile.unlink(missing_ok=True)
tmpfile.parent.mkdir(exist_ok=True)
total_array_size = 16 * 1024**2 + 17
half_array_size = total_array_size // 2
for fortran_order in [False, True]:
shape = (27, 11, -1) if fortran_order else (-1, 11, 27)
order = 'F' if fortran_order else 'C'
dtype_base = np.uint64
arr = np.arange(total_array_size, dtype=dtype_base)
arr = arr.reshape(shape, order=order)
append_axis_item_count = np.multiply.reduce(arr.shape[
slice(None, None, -1 if arr.flags.fnc else 1)
][1:])
def get_array_header(dtype=arr.dtype):
fp = io.BytesIO()
np.lib.format._write_array_header(fp, {
'shape': arr.shape,
'fortran_order': arr.flags.fnc,
'descr': np.lib.format.dtype_to_descr(dtype)
})
return fp.getvalue()
# ensure_appendable
for inplace in [False, True]:
# "fortran_order": True uses one byte less than "fortran_order": False
dtype_template = np.dtype([(
'_'*(24+(1 if fortran_order else 0)), dtype_base
)])
np.save(tmpfile, arr.astype(dtype=dtype_template))
with open(tmpfile, 'rb+') as fp:
# overwrite with a non-appendable header
fp.write(get_array_header(dtype_template).replace(
b' \n', b'\n'
).replace(b'\'_', b'\'__'))
assert not npy_append_array.is_appendable(tmpfile)
npy_append_array.ensure_appendable(tmpfile, inplace=inplace)
assert npy_append_array.is_appendable(tmpfile)
assert np.all(arr == np.load(tmpfile).astype(dtype_base))
# recover
for zerofill_incomplete in [False, True]:
np.save(tmpfile, arr)
os.truncate(tmpfile, (
tmpfile.stat().st_size + len(get_array_header())
) // 2)
npy_append_array.recover(tmpfile, zerofill_incomplete)
data_length = int((np.ceil if zerofill_incomplete else np.floor)(
half_array_size / append_axis_item_count
)) * append_axis_item_count
arr2 = np.load(tmpfile).flatten(order=order)
assert data_length == arr2.shape[0]
assert np.all(arr2[half_array_size + 1:] == 0)
assert np.all((
arr.flatten(order=order)[:data_length] == arr2
)[:half_array_size if zerofill_incomplete else data_length])
tmpfile.unlink()
# test regular append for C order and Fortran arrays
for (
use_np_save, rewrite_header_on_append, delete_if_exists,
is_fortran_array1, is_fortran_array2
) in product(*[[False, True]]*5):
dtype1 = None
dtype2 = None
order1 = 'F' if is_fortran_array1 else 'C'
order2 = 'F' if is_fortran_array2 else 'C'
# We need at least three shape entries, none being 1, especially to
# test what happens if one appends a fortran to a non-fortran array
# and vice versa.
shape1 = (2,3,4)
shape2 = (2,3,5) if is_fortran_array1 else (5,3,4)
product1 = np.multiply.reduce(shape1)
product2 = np.multiply.reduce(shape2)
arr1 = np.arange(product1, dtype=dtype1).reshape(shape1, order=order1)
arr2 = np.arange(
product1, product1 + product2, dtype=dtype2
).reshape(shape2, order=order2)
arr2_append_count = 10
threads = []
if use_np_save:
np.save(tmpfile, arr1)
with NpyAppendArray(
tmpfile, delete_if_exists=delete_if_exists,
rewrite_header_on_append=rewrite_header_on_append
) as npaa:
if delete_if_exists or not use_np_save:
npaa.append(arr1)
def task():
npaa.append(arr2)
for i in range(arr2_append_count):
thread = threading.Thread(target=task)
threads += [thread]
thread.start()
# make sure to join threads within the "with NpyAppendArray ..."
for thread in threads:
thread.join()
arr = np.load(tmpfile)
arr_ref = np.concatenate(
[arr1, *[arr2]*arr2_append_count],
axis = -1 if is_fortran_array1 else 0,
dtype = arr1.dtype
)
assert np.all(arr == arr_ref)
tmpfile.unlink(missing_ok=True)
for i in range(40):
with NpyAppendArray(tmpfile) as npaa:
npaa.append(np.zeros((50000, 76, 3)))
tmpfile.unlink(missing_ok=True)