-
Notifications
You must be signed in to change notification settings - Fork 1
/
pyaudiowave.py
385 lines (290 loc) · 13.3 KB
/
pyaudiowave.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# -*- coding: utf-8 -*-
"""
A module containing a main class (Pyauadiowave) that takes in Wave objects from
the wavemaker module and outputs a variety of signals from the given wave(s). It
also contains a helper class (SignalMaker) to pass to fwp_pyaudiowave playback
functions tu properly setup parameters.
"""
import numpy as np
#%% The class
class PyAudioWave:
""" A class which takes in a wave object and formats it accordingly to the
requirements of the pyaudio module for playing. It includes two simple
methods that return a signal formated to play or a signal formated to plot,
respectively.
Parameters
----------
Pyaudio parameters required:
samplingrate : int
Sampling (or playback) rate.
buffersize : int
Writing buffer size
nchannels : {1, 2}
Number of channels.
Attributes
----------
sampling_rate : int or float
Sampling rate for data adquisition
buffer_size : int or float
Buffer size por data adquisition
nchannels : {1,2}
number of channels used for recording and playing
debugmode : bool
if true, activates prints along the code to find bugs
Methods (public)
-------
write_generator :
Creates a generator to yield chunks of length buffer_size of the
generated wave for a total time equal to duration.
plot_signal :
Returns time and signal arrays ready to plot. If only one wave is
given, output will be the same as write_signal, but will also return
time. If a tuple of waves is given, output will be time and a list of
the signal arrays.
"""
def __init__(self, samplingrate=44100, buffersize=1024, nchannels=1, debugmode=False):
self.sampling_rate = samplingrate
self.buffer_size = buffersize
self.nchannels = nchannels
self.debugmode = debugmode
def debugprint(self, printable):
"""
if true, activates prints along the code to find bugs
Parameters
----------
printable :
a printable object to be printed along the code where desired
Returns
-------
prints printable
"""
if self.debugmode:
print(printable)
#% Some secundary methods
def create_time(self, wave, periods_per_chunk=1):
""" Creates a time array for other functions tu use.
Parameters
----------
wave : Wave object
Object created by wavemaker class with desired function
periods_per_chunk : int or float optional
Amount of periods to be sent to audio output. Default= 1.
Returns
-------
numpy array
Time array
"""
period = 1/wave.frequency
time = np.linspace(start = 0, stop = period * periods_per_chunk,
num = period * periods_per_chunk * self.sampling_rate,
endpoint = False)
return time
def encode(self,signal):
"""
Formats desired signal for pyaudio stream. Deals with one- and
two-channel signals.
Parameters
----------
signal : numpy array
Signal to be played. Must be numpy array with shape (chunk_size, channels)
Returns
-------
str
Byte stream signal formatted as is needed by pyaudio to play
"""
if self.nchannels == 2:
#Two channel signal requires some extra handling
signal = np.transpose(np.array(signal))
interleaved = signal.flatten()
out_data = interleaved.astype(np.float32).tostring()
return out_data
def resolve_nchannels(self, wave, display_warnings):
"""
Resolve wave or wave tuple for given channel ammount. Return a tuple.
Parameters
----------
wave : wave object
Object created by wavemaker class with desired function
display_warning : bool
If True displays warnings regarding number of channels and wave incompatibilities
Returns
----------
tuple
tuple containing wave object(s)
"""
if self.nchannels == 1:
#If user passed one wave objects and requested single-channel signal:
if not isinstance(wave,tuple):
if display_warnings: print('Requested a one channel signal but provided more than one wave. Will precede using the first wave.')
return (wave,)
else:
# Ahora mismo hay un probema de diseño con escrir en dos canales
# y loopear sobre un únic array, porque lo que se quiere escribir en
# los dos canales pede no tener frecuencias compatibles y una de
# ellas queda cortada en cada iteración. Para solucionarlo, habría
# que rehacer play_callback para que llame a algo que le de una señal
# en cada iteración. Por ahora, devuelve los cachos cortados.
#If user passed one wave object, but requested two-channel signal
if not isinstance(wave,tuple): #should rewrite as warning
if display_warnings: print('''Requested two channel signal, but only provided one wave object. Will write same signal in both channels.''')
return (wave,wave)
else: #should rewrite as warning
if display_warnings: print('''Requested two channel signal. If frequencies are not compatible, second channel wave will be cut off.''')
#If no correction was needed, return as input
return wave
def eval_wave(self, wave, time):
"""Simple method evaluating the given wave(s) according to channels.
Parameters
----------
wave : wave object
Object created by wavemaker class with desired function
time : numpy array
Time in which to evaluate given waveform function
Returns
----------
numpy array
Evaluated given wave
"""
#Could also be implemented as [wave[i] for i in range(nchannels)]
if self.nchannels == 1:
#list is used to have an array of shape (1,N), instad of (N,)
signal = np.array([np.transpose(wave[0].evaluate(time))])
else:
signal = np.array([np.transpose(w.evaluate(time)) for w in wave])
return signal
def yield_a_bit(self, signal):
"""Yield chunck of the given signal of lenght buffer_size.
Parameters
----------
signal : numpy array
Signal to be played. Should be an array of shape (samples, nchannels).
Yields
----------
str
Byte stream signal formatted as is needed by pyaudio to play
"""
#Since buffers_per_arrray might be smaller than
#len(signal)//self.buffer_size, I'll use the latter:
self.debugprint('Entered yield_a_bit')
for i in range(int(signal.shape[1]/self.buffer_size)):
# self.debugprint('Engaged its for loop. Iteration {} of {}'.format(i, int(signal.shape[1]/self.buffer_size)))
yield self.encode(signal[:,self.buffer_size * i:self.buffer_size * (i+1)])
#% The actual useful methods
def write_generator(self, wave, duration=None, buffers_per_array=100, display_warnings=False):
"""Creates a generator to yield chunks of length buffer_size of the
generated wave for a total time equal to duration. If duration is
None, it will generate samples forever.
Parameters
----------
wave : wave object
Object created by wavemaker class with desired function
duration : int or float optional
Desired time lenght of signal in seconds. Default: none.
buffers_per_array : int optional
How many buffer sized "blocks" should be fitted in the
array. Default: 100.
display_warning : bool
If True displays warnings regarding number of channels
and wave incompatibilities. Default= False.
Yields
----------
str
Byte stream signal formatted as is needed by pyaudio to play
"""
wave = self.resolve_nchannels(wave, display_warnings)
self.debugprint('Wave tuple lentgh: {}'.format(len(wave)))
#Get whole number of periods bigger than given buffer_size
required_periods = self.buffer_size * wave[0].frequency // self.sampling_rate + 1
#Create a time vector just greater than buffers_per_array*buffer_size
time = self.create_time(wave[0],
periods_per_chunk = required_periods * buffers_per_array)
self.debugprint('Time length: {}'.format(len(time)))
signal = self.eval_wave(wave, time)
self.debugprint('Signal = {}'.format(signal))
self.debugprint('Signal shape: {}'.format(signal.shape))
# yield signal
yield_signal = signal
#Handle different duration values:
if duration is None:
self.debugprint('Mode engaged: Indefinitely')
# Yield samples indefinitely
while True:
yield from self.yield_a_bit(yield_signal)
last_place = yield_signal.shape[1]//self.buffer_size
yield_signal = np.concatenate((yield_signal[:,last_place:],signal), axis=1)
elif duration < signal.shape[1] / self.sampling_rate:
self.debugprint('Mode engaged: Short')
total_length = duration * self.sampling_rate
yield from self.yield_a_bit(signal[:,:total_length])
yield self.encode(signal[:,total_length:]) #yield last bit
else:
self.debugprint('Mode engaged: Long')
iterations = duration * wave[0].frequency // (required_periods * buffers_per_array)
self.debugprint('Number of full iterations to run: {}'.format(iterations))
for _ in range(int(iterations)):
yield from self.yield_a_bit(yield_signal)
last_place = int(yield_signal.shape[1]/self.buffer_size)
self.debugprint(''' Array sizes:
yield_signal[:,last_place:]: {}
signal: {}'''.format(
yield_signal[:,last_place:].shape, signal.shape))
yield_signal = np.concatenate((yield_signal[:,last_place:],signal), axis=1)
#Missing line to get exact duration
def plot_signal(self, wave, periods_per_chunk=1):
""" Returns time and signal arrays ready to plot. If only one wave is
given, output will be the same as write_signal, but will also return
time. If a tuple of waves is given, output will be time and a list of
the signal arrays.
Parameters
----------
wave : wave object
Object created by wavemaker class with desired function
periods_per_chunk : int or float optional
Amount of periods to be sent to audio output. Default: 1.
Returns
----------
2D numpy array
array constituted by time, and signal (evaluated waveform)
"""
if not isinstance(wave,tuple):
time = self.create_time(wave, periods_per_chunk)
return time, wave.evaluate(time)
else:
time = self.create_time(wave[0], periods_per_chunk)
signal_list = [w.evaluate(time) for w in wave]
return time, signal_list
def generator_setup(self, wave, duration=None, buffers_per_array=100, display_warnings=False):
si = SignalMaker(wave, duration, self)
si.generator = self.write_generator( wave, duration, buffers_per_array, display_warnings)
return si
class SignalMaker(PyAudioWave):
'''A secondary class with no methods used to pass a signal generator to the player.'''
def __init__(self, wave, duration, parent):
self.wave = wave
self.duration = duration
self.parent = parent
#%% example to try everything out
#import wavemaker
##%%
#seno1 = wavemaker.Wave('sine', frequency=4)
#seno2 = wavemaker.Wave('sine', frequency=3, amplitude=1.3)
#triang = wavemaker.Wave('triangular',frequency=3)
#
##Some parameters:
#samplerate = 44100
#buffersize = 1024
#
## An input maker with given parameters and one channel
#InputMaker = PyAudioWave(samplerate, buffersize, debugmode=True)
#
#
#InputMaker.nchannels = 1
#sound_gen = InputMaker.write_generator((seno1, triang), duration=10, buffers_per_array=20)
#
##%%
#k = 0
#for s in sound_gen:
# print(k)
# k +=1
## if k>20:
## break