-
Notifications
You must be signed in to change notification settings - Fork 0
/
merge_delta.py
215 lines (192 loc) · 7.62 KB
/
merge_delta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#!/usr/bin/env python3
"""cv-tbox Diversity Check / Split Maker - Delta Upgrade"""
###########################################################################
# delta_upgrade.py
#
# Get data from current delta and previous version, combine them.
# Put them into experiment directories, and run the s1 algorithm
#
# Use:
# python delta_upgrade.py
#
# This script is part of Common Voice ToolBox Package
#
# github: https://github.com/HarikalarKutusu/cv-tbox-split-maker
# Copyright: (c) Bülent Özden, License: AGPL v3.0
###########################################################################
# Standard Lib
import os
import sys
import glob
import shutil
import multiprocessing as mp
import threading
from datetime import datetime
from typing import Literal, TypeAlias, TypedDict
# External Dependencies
import psutil
import pandas as pd
from tqdm import tqdm
# Module
import conf
from lib import dec3, df_read, df_write
# Globals
HERE: str = os.path.dirname(os.path.realpath(__file__))
if not HERE in sys.path:
sys.path.append(HERE)
PROC_LIMIT: int = 4
PROC_COUNT: int = min(PROC_LIMIT, psutil.cpu_count(logical=True) or 1)
output_lock = threading.Lock()
class Params(TypedDict):
"""MultiProcessing parameters"""
lc: str
old_dir: str
delta_dir: str
new_dir: str
def merge_delta_process(params: Params) -> None:
"""Multiprocessing handler for single language delta merge"""
# create destinations
lc: str = os.path.split(params["delta_dir"])[-1]
# create destination full dir
os.makedirs(params["new_dir"], exist_ok=True)
# first copy non-delta nature files to full-dir (if they exsist - introduced with v17.0)
for f in ["validated_sentences", "unvalidated_sentences"]:
_fpath: str = os.path.join(params["delta_dir"], f"{f}.tsv")
if os.path.isfile(_fpath):
shutil.copy2(_fpath, params["new_dir"])
#
# Merge data in files with delta nature
#
decided_set: set[str] = {}
for f in ["validated", "invalidated", "clip_durations", "reported"]:
df_prev: pd.DataFrame = df_read(os.path.join(params["old_dir"], f"{f}.tsv"))
df_delta: pd.DataFrame = df_read(os.path.join(params["delta_dir"], f"{f}.tsv"))
# handle any new columns
if len(set(df_prev.columns) - set(df_delta.columns)) != 0:
df_prev.reindex(columns=df_delta.columns)
# merge & sort & save
df_final: pd.DataFrame = pd.concat([df_prev, df_delta])
# we don't have duplicates - except in "reported.tsv"
if f != "reported":
df_final.drop_duplicates(inplace=True)
# sort by a field - field depends on file
_cols: list[str] = list(df_final.columns)
if "path" in _cols:
df_final.sort_values(["path"], inplace=True)
elif "sentence_id" in _cols:
df_final.sort_values(["sentence_id"], inplace=True)
elif "clip" in _cols:
df_final.sort_values(["clip"], inplace=True)
# write out new data
df_write(df_final, os.path.join(params["new_dir"], f"{f}.tsv"))
# keep record of items in validated & invalidated for "other.tsv" calculation
if f in ["validated", "invalidated"]:
decided_set = set(list(decided_set) + list(set(df_final["path"].to_list())))
#
# Handle "other"
#
df_prev: pd.DataFrame = df_read(os.path.join(params["old_dir"], "other.tsv"))
df_delta: pd.DataFrame = df_read(os.path.join(params["delta_dir"], "other.tsv"))
# handle any new columns
if len(set(df_prev.columns) - set(df_delta.columns)) != 0:
df_prev.reindex(columns=df_delta.columns)
# merge & dedup & sort => still contains recs which moved to val/inval in new version
df_final: pd.DataFrame = (
pd.concat([df_prev, df_delta]).drop_duplicates().sort_values(["path"])
)
# only allow those not in new val & inval
df_final = df_final[~df_final["path"].isin(decided_set)]
df_write(df_final, os.path.join(params["new_dir"], "other.tsv"))
# return ğarams
return params
def main(base_prev_dir: str, base_delta_dir: str) -> None:
"""Main process"""
start_time: datetime = datetime.now()
#
# Checks
#
# Check incoming directories
if not os.path.isdir(base_prev_dir):
print(
f"FATAL: Source expanded previous version directory could not be located ({base_prev_dir})"
)
return
if not os.path.isdir(base_delta_dir):
print(
f"FATAL: Source expanded delta directory could not be located ({base_delta_dir})"
)
return
delta_release_dirname: str = os.path.split(base_delta_dir)[-1]
if "delta" not in delta_release_dirname:
print(f"FATAL: This directory is not for delta release ({base_delta_dir})")
return
# Check if delta dir has language data
delta_lc_dirs: list[str] = glob.glob(os.path.join(base_delta_dir, "*"))
total_cnt: int = len(delta_lc_dirs)
if total_cnt == 0:
print("FATAL: Delta directory does not contain expanded language directories.")
return
# Build parameter list - also with some checks
params_list: list[Params] = []
existing_list: list[str] = []
no_prev_list: list[str] = []
for delta_lc_dir in delta_lc_dirs:
_lc: str = os.path.split(delta_lc_dir)[-1]
_old_lc_dir: str = os.path.join(
conf.CV_EXTRACTED_BASE_DIR, conf.CV_FULL_PREV_VERSION, _lc
)
_new_lc_dir: str = delta_lc_dir.replace("-delta-", "-")
# skip extra processing if prev-lc-dir does not exists
if not os.path.isdir(_old_lc_dir):
no_prev_list.append(_lc)
continue
# skip if destination exists
if os.path.isdir(_new_lc_dir) and not conf.FORCE_CREATE:
existing_list.append(_lc)
continue
# now we are OK
params_list.append(
Params(
lc=_lc,
old_dir=_old_lc_dir,
delta_dir=delta_lc_dir,
new_dir=_new_lc_dir,
)
)
if existing_list:
print(f"SKIPPED [{len(existing_list)}] - EXISTING: {existing_list}")
if no_prev_list:
print(f"SKIPPED [{len(no_prev_list)}] - NO OLD VERSION: {no_prev_list}")
actual_cnt: int = len(params_list)
if actual_cnt == 0:
print("No delta datasets to merge!")
return
#
# Get delta .tsv from expanded delta directory (validated, invalidated, other)
# Get related .tsv from previous version directory
# concat them and save into base experiments/s1 directory
#
print(f"Delta-Merge {actual_cnt} locales out of {total_cnt} PROCS=")
num_procs: int = max(1, min(PROC_COUNT, actual_cnt))
chunk_size: int = max(1, min(actual_cnt // 100, actual_cnt // num_procs))
_cnt: int = 0
_par: Params
with mp.Pool(processes=PROC_COUNT) as pool:
with tqdm(delta_lc_dirs, total=actual_cnt, unit=" Dataset") as pbar:
for _par in pool.imap_unordered(
merge_delta_process, params_list, chunksize=chunk_size
):
_cnt += 1
pbar.write(f"Finished: {_par['lc']}\t[{_cnt}/{actual_cnt}]")
pbar.update()
# finalize
seconds: int = (datetime.now() - start_time).seconds
print(f"Finished in {seconds} sec - Avg={dec3(seconds/actual_cnt)} sec/locale")
if __name__ == "__main__":
# [TODO] : use command line args
# args_src_dir: str = "m:\\DATASETS\\cv\\cv-corpus-16.0-2023-12-06"
args_prev_dir: str = os.path.join(
conf.CV_METADATA_BASE_DIR, conf.CV_FULL_PREV_VERSION
)
args_delta_dir: str = os.path.join(conf.CV_METADATA_BASE_DIR, conf.CV_DELTA_VERSION)
main(args_prev_dir, args_delta_dir)