-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstreamtry.py
127 lines (106 loc) · 3.52 KB
/
streamtry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import argparse
import json
import shutil
import subprocess
import sys
import time
from typing import Iterator
import requests
def is_installed(lib_name: str) -> bool:
lib = shutil.which(lib_name)
if lib is None:
return False
return True
def save(audio: bytes, filename: str) -> None:
with open(filename, "wb") as f:
f.write(audio)
def stream_ffplay(audio_stream, output_file, save=True):
if not save:
ffplay_cmd = ["ffplay", "-nodisp", "-probesize", "1024", "-autoexit", "-"]
else:
print("Saving to ", output_file)
ffplay_cmd = ["ffmpeg", "-probesize", "1024", "-i", "-", output_file]
ffplay_proc = subprocess.Popen(ffplay_cmd, stdin=subprocess.PIPE)
for chunk in audio_stream:
if chunk is not None:
ffplay_proc.stdin.write(chunk)
# close on finish
ffplay_proc.stdin.close()
ffplay_proc.wait()
def tts(text, speaker, language, server_url, stream_chunk_size) -> Iterator[bytes]:
start = time.perf_counter()
speaker["text"] = text
speaker["language"] = language
speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality
res = requests.post(
f"{server_url}/tts_stream",
json=speaker,
stream=True,
)
end = time.perf_counter()
print(f"Time to make POST: {end-start}s", file=sys.stderr)
if res.status_code != 200:
print("Error:", res.text)
sys.exit(1)
first = True
for chunk in res.iter_content(chunk_size=512):
if first:
end = time.perf_counter()
print(f"Time to first chunk: {end-start}s", file=sys.stderr)
first = False
if chunk:
yield chunk
print("⏱️ response.elapsed:", res.elapsed)
def get_speaker(ref_audio,server_url):
files = {"wav_file": ("reference.wav", open(ref_audio, "rb"))}
response = requests.post(f"{server_url}/clone_speaker", files=files)
return response.json()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--text",
default="It took me quite a long time to develop a voice and now that I have it I am not going to be silent.",
help="text input for TTS"
)
parser.add_argument(
"--language",
default="en",
help="Language to use default is 'en' (English)"
)
parser.add_argument(
"--output_file",
default=None,
help="Save TTS output to given filename"
)
parser.add_argument(
"--ref_file",
default=None,
help="Reference audio file to use, when not given will use default"
)
parser.add_argument(
"--server_url",
default="http://localhost:8000",
help="Server url http://localhost:8000 default, change to your server location "
)
parser.add_argument(
"--stream_chunk_size",
default="20",
help="Stream chunk size , 20 default, reducing will get faster latency but may degrade quality"
)
args = parser.parse_args()
with open("./default_speaker.json", "r") as file:
speaker = json.load(file)
if args.ref_file is not None:
print("Computing the latents for a new reference...")
speaker = get_speaker(args.ref_file, args.server_url)
audio = stream_ffplay(
tts(
args.text,
speaker,
args.language,
args.server_url,
args.stream_chunk_size
),
args.output_file,
save=bool(args.output_file)
)