-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathvideo.py
168 lines (136 loc) · 4.92 KB
/
video.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
"""
BlueRov video capture class
"""
import cv2
import gi
import numpy as np
gi.require_version("Gst", "1.0")
from gi.repository import Gst
class Video:
"""BlueRov video capture class constructor
Attributes:
port (int): Video UDP port
video_codec (string): Source h264 parser
video_decode (string): Transform YUV (12bits) to BGR (24bits)
video_pipe (object): GStreamer top-level pipeline
video_sink (object): Gstreamer sink element
video_sink_conf (string): Sink configuration
video_source (string): Udp source ip and port
latest_frame (np.ndarray): Latest retrieved video frame
"""
def __init__(self, port=5600):
"""Summary
Args:
port (int, optional): UDP port
"""
Gst.init(None)
self.port = port
self.latest_frame = self._new_frame = None
# [Software component diagram](https://www.ardusub.com/software/components.html)
# UDP video stream (:5600)
self.video_source = "udpsrc port={}".format(self.port)
# [Rasp raw image](http://picamera.readthedocs.io/en/release-0.7/recipes2.html#raw-image-capture-yuv-format)
# Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)
self.video_codec = (
"! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264"
)
# Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
self.video_decode = (
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert"
)
# Create a sink to get data
self.video_sink_conf = (
"! appsink emit-signals=true sync=false max-buffers=2 drop=true"
)
self.video_pipe = None
self.video_sink = None
self.run()
def start_gst(self, config=None):
""" Start gstreamer pipeline and sink
Pipeline description list e.g:
[
'videotestsrc ! decodebin', \
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
Args:
config (list, optional): Gstreamer pileline description list
"""
if not config:
config = [
"videotestsrc ! decodebin",
"! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert",
"! appsink",
]
command = " ".join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name("appsink0")
@staticmethod
def gst_to_opencv(sample):
"""Transform byte array into np array
Args:
sample (TYPE): Description
Returns:
TYPE: Description
"""
buf = sample.get_buffer()
caps_structure = sample.get_caps().get_structure(0)
array = np.ndarray(
(caps_structure.get_value("height"), caps_structure.get_value("width"), 3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint8,
)
return array
def frame(self):
"""Get Frame
Returns:
np.ndarray: latest retrieved image frame
"""
if self.frame_available:
self.latest_frame = self._new_frame
# reset to indicate latest frame has been 'consumed'
self._new_frame = None
return self.latest_frame
def frame_available(self):
"""Check if a new frame is available
Returns:
bool: true if a new frame is available
"""
return self._new_frame is not None
def run(self):
"""Get frame to update _new_frame"""
self.start_gst(
[
self.video_source,
self.video_codec,
self.video_decode,
self.video_sink_conf,
]
)
self.video_sink.connect("new-sample", self.callback)
def callback(self, sink):
sample = sink.emit("pull-sample")
self._new_frame = self.gst_to_opencv(sample)
return Gst.FlowReturn.OK
if __name__ == "__main__":
# Create the video object
# Add port= if is necessary to use a different one
video = Video()
print("Initialising stream...")
waited = 0
while not video.frame_available():
waited += 1
print("\r Frame not available (x{})".format(waited), end="")
cv2.waitKey(30)
print('\nSuccess!\nStarting streaming - press "q" to quit.')
while True:
# Wait for the next frame to become available
if video.frame_available():
# Only retrieve and display a frame if it's new
frame = video.frame()
cv2.imshow("frame", frame)
# Allow frame to display, and check if user wants to quit
if cv2.waitKey(1) & 0xFF == ord("q"):
break