-
Notifications
You must be signed in to change notification settings - Fork 362
/
frame.pyx
149 lines (120 loc) · 4.68 KB
/
frame.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from av.audio.format cimport get_audio_format
from av.audio.layout cimport get_audio_layout
from av.audio.plane cimport AudioPlane
from av.utils cimport err_check
cdef object _cinit_bypass_sentinel
cdef AudioFrame alloc_audio_frame():
"""Get a mostly uninitialized AudioFrame.
You MUST call AudioFrame._init(...) or AudioFrame._init_user_attributes()
before exposing to the user.
"""
return AudioFrame.__new__(AudioFrame, _cinit_bypass_sentinel)
cdef class AudioFrame(Frame):
"""A frame of audio."""
def __cinit__(self, format='s16', layout='stereo', samples=0, align=True):
if format is _cinit_bypass_sentinel:
return
cdef AudioFormat cy_format = AudioFormat(format)
cdef AudioLayout cy_layout = AudioLayout(layout)
self._init(cy_format.sample_fmt, cy_layout.layout, samples, align)
cdef _init(self, lib.AVSampleFormat format, uint64_t layout, unsigned int nb_samples, bint align):
self.align = align
self.ptr.nb_samples = nb_samples
self.ptr.format = <int>format
self.ptr.channel_layout = layout
# HACK: It really sucks to do this twice.
self._init_user_attributes()
cdef size_t buffer_size
if self.layout.channels and nb_samples:
# Cleanup the old buffer.
lib.av_freep(&self._buffer)
# Get a new one.
self._buffer_size = err_check(lib.av_samples_get_buffer_size(
NULL,
len(self.layout.channels),
nb_samples,
format,
align
))
self._buffer = <uint8_t *>lib.av_malloc(self._buffer_size)
if not self._buffer:
raise MemoryError("cannot allocate AudioFrame buffer")
# Connect the data pointers to the buffer.
err_check(lib.avcodec_fill_audio_frame(
self.ptr,
len(self.layout.channels),
<lib.AVSampleFormat>self.ptr.format,
self._buffer,
self._buffer_size,
self.align
))
self._init_planes(AudioPlane)
def __dealloc__(self):
lib.av_freep(&self._buffer)
cdef _recalc_linesize(self):
lib.av_samples_get_buffer_size(
self.ptr.linesize,
len(self.layout.channels),
self.ptr.nb_samples,
<lib.AVSampleFormat>self.ptr.format,
self.align
)
# We need to reset the buffer_size on the AudioPlane/s. This is
# an easy, if inefficient way.
self._init_planes(AudioPlane)
cdef _init_user_attributes(self):
self.layout = get_audio_layout(0, self.ptr.channel_layout)
self.format = get_audio_format(<lib.AVSampleFormat>self.ptr.format)
self.nb_channels = lib.av_get_channel_layout_nb_channels(self.ptr.channel_layout)
self.ptr.channels = self.nb_channels
self.nb_planes = self.nb_channels if lib.av_sample_fmt_is_planar(<lib.AVSampleFormat>self.ptr.format) else 1
self._init_planes(AudioPlane)
def __repr__(self):
return '<av.%s %d, pts=%s, %d samples at %dHz, %s, %s at 0x%x>' % (
self.__class__.__name__,
self.index,
self.pts,
self.samples,
self.rate,
self.layout.name,
self.format.name,
id(self),
)
property samples:
"""Number of audio samples (per channel) """
def __get__(self):
return self.ptr.nb_samples
property sample_rate:
"""Sample rate of the audio data. """
def __get__(self):
return self.ptr.sample_rate
def __set__(self, value):
self.ptr.sample_rate = value
property rate:
def __get__(self):
return self.ptr.sample_rate
def __set__(self, value):
self.ptr.sample_rate = value
def to_nd_array(self, **kwargs):
"""Get a numpy array of this frame.
Any ``**kwargs`` are passed to :meth:`AudioFrame.reformat`.
"""
import numpy as np
# map avcodec type to numpy type
try:
dtype = np.dtype({
'u8' : 'u1',
'u8p' : 'u1',
's16' :'<i2',
's16p':'<i2',
's32' :'<i4',
's32p':'<i4',
'flt' :'<f4',
'fltp':'<f4',
'dbl' :'<f8',
'dblp':'<f8'
}[self.format.name])
except:
raise AssertionError("Don't know how to convert data type.", self.format.name)
# convert and return data
return np.vstack(map(lambda x: np.frombuffer(x, dtype), self.planes))