-
Notifications
You must be signed in to change notification settings - Fork 924
/
Copy pathflows.py
75 lines (61 loc) · 2.42 KB
/
flows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Copyright © 2023-2024 Apple Inc.
from typing import Optional, Tuple, Union
import mlx.core as mx
import mlx.nn as nn
from bijectors import AffineBijector, MaskedCoupling
from distributions import Normal
class MLP(nn.Module):
def __init__(self, n_layers: int, d_in: int, d_hidden: int, d_out: int):
super().__init__()
layer_sizes = [d_in] + [d_hidden] * n_layers + [d_out]
self.layers = [
nn.Linear(idim, odim)
for idim, odim in zip(layer_sizes[:-1], layer_sizes[1:])
]
def __call__(self, x):
for l in self.layers[:-1]:
x = nn.gelu(l(x))
return self.layers[-1](x)
class RealNVP(nn.Module):
def __init__(self, n_transforms: int, d_params: int, d_hidden: int, n_layers: int):
super().__init__()
# Alternating masks
self.mask_list = [mx.arange(d_params) % 2 == i % 2 for i in range(n_transforms)]
self.mask_list = [mask.astype(mx.bool_) for mask in self.mask_list]
self.freeze(keys=["mask_list"])
# Conditioning MLP
self.conditioner_list = [
MLP(n_layers, d_params, d_hidden, 2 * d_params) for _ in range(n_transforms)
]
self.base_dist = Normal(mx.zeros(d_params), mx.ones(d_params))
def log_prob(self, x: mx.array):
"""
Flow back to the primal Gaussian and compute log-density,
adding the transformation log-determinant along the way.
"""
log_prob = mx.zeros(x.shape[0])
for mask, conditioner in zip(self.mask_list[::-1], self.conditioner_list[::-1]):
x, ldj = MaskedCoupling(
mask, conditioner, AffineBijector
).inverse_and_log_det(x)
log_prob += ldj
return log_prob + self.base_dist.log_prob(x).sum(-1)
def sample(
self,
sample_shape: Union[int, Tuple[int, ...]],
key: Optional[mx.array] = None,
n_transforms: Optional[int] = None,
):
"""
Sample from the primal Gaussian and flow towards the target distribution.
"""
x = self.base_dist.sample(sample_shape, key=key)
for mask, conditioner in zip(
self.mask_list[:n_transforms], self.conditioner_list[:n_transforms]
):
x, _ = MaskedCoupling(
mask, conditioner, AffineBijector
).forward_and_log_det(x)
return x
def __call__(self, x: mx.array):
return self.log_prob(x)