-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathconvert-lora-to-ggml.py
197 lines (164 loc) · 6.68 KB
/
convert-lora-to-ggml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from io import BufferedWriter
import json
import os
import re
import struct
import sys
from typing import Any, List, Mapping, MutableMapping, Sequence, Tuple
import argparse
import torch
from convert import DATA_TYPE_TO_FTYPE, NUMPY_TYPE_TO_DATA_TYPE, DataType
HF_SUBLAYER_TO_GGML: Mapping[str, str] = {
"self_attn.q_proj": "attention.wq",
"self_attn.k_proj": "attention.wk",
"self_attn.v_proj": "attention.wv",
"self_attn.o_proj": "attention.wo",
"mlp.gate_proj": "feed_forward.w1",
"mlp.down_proj": "feed_forward.w2",
"mlp.up_proj": "feed_forward.w3",
"input_layernorm": "attention_norm",
"post_attention_layernorm": "ffn_norm",
# "norm": "norm",
# "embed_tokens": "tok_embeddings",
# "lm_head": "output",
}
def translate_tensor_name(t: str) -> Tuple[str, str]:
match = re.match(r".*layers\.(\d+)\.(\w+\.\w+)\.lora_(A|B)\.weight", t)
if match:
nn = match.group(1)
sub_layer = match.group(2)
lora_type = match.group(3)
sub_layer_renamed = HF_SUBLAYER_TO_GGML.get(sub_layer)
if sub_layer_renamed is None:
print(f"Error: unrecognized sub-layer {sub_layer} in tensor {t}")
sys.exit(1)
output_string = (
f"layers.{nn}.{HF_SUBLAYER_TO_GGML[sub_layer]}.weight.lora"
)
return (output_string, lora_type)
else:
print(f"Error: unrecognized tensor {t}")
sys.exit(1)
def write_file_header(fout: BufferedWriter, params: Mapping[str, Any], no_cache: bool = False) -> None:
fout.write(b"ggla"[::-1]) # magic (ggml lora)
fout.write(struct.pack("I", 1)) # file version
fout.write(struct.pack("?", 0 if no_cache else 1)) # cache is enabled or not
fout.write(struct.pack("II", params["r"], params["lora_alpha"]))
def write_tensor_header(
fout: BufferedWriter, name: str, shape: Sequence[int], data_type: DataType
) -> None:
sname = bytes(name, 'utf-8')
fout.write(
struct.pack(
"III",
len(shape),
len(sname),
DATA_TYPE_TO_FTYPE[NUMPY_TYPE_TO_DATA_TYPE[data_type]],
)
)
fout.write(struct.pack("I" * len(shape), *shape[::-1]))
fout.write(sname)
fout.seek(-fout.tell() & 31, os.SEEK_CUR) # align to 32 bytes
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"path",
type=str,
help="Path must contain HuggingFace PEFT LoRA files 'adapter_config.json' and 'adapter_model.bin'",
)
parser.add_argument(
'-t',
'--dtype',
choices=['fp16', 'fp32'],
default='fp32',
help='Data type to use for the converted model. Default: %(default)s',
dest='dtype',
)
parser.add_argument(
'--no-cache',
action='store_true',
help='Cache the matrix multiplication to disk. Default: %(default)s',
dest='no_cache'
)
return parser.parse_args(sys.argv[1:])
def read_params(input_json: str) -> Mapping[str, Any]:
params: MutableMapping[str, Any] = {}
with open(input_json, "r") as f:
params = json.load(f)
if params["peft_type"] != "LORA":
print(f"Error: unsupported adapter type {params['peft_type']}, expected LORA")
sys.exit(1)
if params["fan_in_fan_out"] == True:
print("Error: param fan_in_fan_out is not supported")
sys.exit(1)
if params["bias"] is not None and params["bias"] != "none":
print("Error: param bias is not supported")
sys.exit(1)
# TODO: these seem to be layers that have been trained but without lora.
# doesn't seem widely used but eventually should be supported
if params["modules_to_save"] is not None and len(params["modules_to_save"]) > 0:
print("Error: param modules_to_save is not supported")
sys.exit(1)
return params
def normalize_tensors(model: Any, params: Mapping[str, Any], no_cache: bool = False) -> Mapping[str, List[Tuple[torch.Tensor, str]]]:
r = float(params["r"])
lora_alpha = float(params["lora_alpha"])
scale = lora_alpha / r
# pair up the tensors into a map of (tensor_name, [A, B]) or (tensor_name, [A]) for cache matrix.
# Current implementation requires the tensors to be in the order A, B, A, B, ...
tensor_map: MutableMapping[str, List[Tuple[torch.Tensor, str]]] = {}
for k, v in model.items():
if k.endswith("lora_A.weight"):
if v.dtype != torch.float16 and v.dtype != torch.float32:
v = v.float()
if no_cache:
v = v.T
else:
v = v.float()
(tensor_name, type) = translate_tensor_name(k)
if tensor_name not in tensor_map:
tensor_map[tensor_name] = []
if no_cache:
if type == 'A':
"""Pre-compute the matrix and scale product to save time later"""
tensor_map[tensor_name].append((v * scale, type))
else:
tensor_map[tensor_name].append((v, type))
else:
tensors = tensor_map[tensor_name]
assert len(tensors) < 2
if len(tensors) == 1:
(old_tensor, old_type) = tensors[0]
new_tensor = torch.matmul(v, old_tensor) if old_type == 'A' else torch.matmul(old_tensor, v)
tensor_map[tensor_name] = [(new_tensor * scale, "")]
else:
tensor_map[tensor_name].append((v, type))
return tensor_map
def main() -> None:
args = parse_args()
input_json = os.path.join(args.path, "adapter_config.json")
input_model = os.path.join(args.path, "adapter_model.bin")
output_path = os.path.join(sys.argv[1], "ggml-adapter-model.bin")
params = read_params(input_json)
model = torch.load(input_model, map_location="cpu")
print("Normalizing tensors...")
tensor_map = normalize_tensors(model, params, no_cache=args.no_cache)
print("Normalization completed.\nWriting output...")
with open(output_path, "wb") as fout:
fout.truncate()
write_file_header(fout, params, args.no_cache)
for tname, tensors in tensor_map.items():
if not args.no_cache and (len(tensors) != 1):
continue
for (v, type) in tensors:
if args.dtype == 'fp16':
t = v.half().numpy()
else:
t = v.numpy()
normalized_name = tname + (type if type != "" else "")
print(f"{normalized_name} {t.shape} {t.dtype} {t.nbytes/1024/1024:.2f}MB")
write_tensor_header(fout, normalized_name, t.shape, t.dtype)
t.tofile(fout)
print(f"Converted {input_json} and {input_model} to {output_path}")
if __name__ == '__main__':
main()