-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocessing.py
74 lines (61 loc) · 2.74 KB
/
preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from typing import List, Tuple, Dict
import copy
import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla
def gen_seeds(size: int = None) -> np.ndarray:
max_uint32 = np.iinfo(np.uint32).max
return np.random.randint(
max_uint32 + 1, size=size, dtype=np.uint32)
def exclude_idx(idx: np.ndarray, idx_exclude_list: List[np.ndarray]) -> np.ndarray:
idx_exclude = np.concatenate(idx_exclude_list)
return np.array([i for i in idx if i not in idx_exclude])
def known_unknown_split(
idx: np.ndarray, nknown: int = 1500, seed: int = 4143496719) -> Tuple[np.ndarray, np.ndarray]:
rnd_state = np.random.RandomState(seed)
known_idx = rnd_state.choice(idx, nknown, replace=False)
unknown_idx = exclude_idx(idx, [known_idx])
return known_idx, unknown_idx
def train_stopping_split(
idx: np.ndarray, labels: np.ndarray, ntrain_per_class: int = 20,
nstopping: int = 500, seed: int = 2413340114) -> Tuple[np.ndarray, np.ndarray]:
rnd_state = np.random.RandomState(seed)
train_idx_split = []
for i in range(max(labels) + 1):
train_idx_split.append(rnd_state.choice(
idx[labels == i], ntrain_per_class, replace=False))
train_idx = np.concatenate(train_idx_split)
stopping_idx = rnd_state.choice(
exclude_idx(idx, [train_idx]),
nstopping, replace=False)
return train_idx, stopping_idx
def gen_splits(
labels: np.ndarray, idx_split_args: Dict[str, int],
test: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
all_idx = np.arange(len(labels))
if -1 in labels:
print('deleting the nodes without label.')
all_idx = all_idx[labels != -1]
known_idx, unknown_idx = known_unknown_split(
all_idx, idx_split_args['nknown'])
_, cnts = np.unique(labels[known_idx], return_counts=True)
stopping_split_args = copy.copy(idx_split_args)
del stopping_split_args['nknown']
train_idx, stopping_idx = train_stopping_split(
known_idx, labels[known_idx], **stopping_split_args)
if test:
val_idx = unknown_idx
else:
val_idx = exclude_idx(known_idx, [train_idx, stopping_idx])
return train_idx, stopping_idx, val_idx
def normalize_attributes(attr_matrix):
epsilon = 1e-12
if isinstance(attr_matrix, sp.csr_matrix):
attr_norms = spla.norm(attr_matrix, ord=1, axis=1)
attr_invnorms = 1 / np.maximum(attr_norms, epsilon)
attr_mat_norm = attr_matrix.multiply(attr_invnorms[:, np.newaxis])
else:
attr_norms = np.linalg.norm(attr_matrix, ord=1, axis=1)
attr_invnorms = 1 / np.maximum(attr_norms, epsilon)
attr_mat_norm = attr_matrix * attr_invnorms[:, np.newaxis]
return attr_mat_norm