-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvideocapture.py
72 lines (64 loc) · 1.99 KB
/
videocapture.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Video capture module. Run capture from uri
Disabled:
error E1101: Module 'cv2' has no 'VideoCapture' member [E:no-member]
"""
# pylint: disable=C0103,E1101,R0902
from threading import Thread
import cv2
class VideoCapture:
"""
Mimicking emgucv VideoCapture wrapper
If currentFrame returns grabbed == false than capture stops
You can either check isRunning before call currentFrame or check currentFrame grabbed value
"""
def __init__(self, vsid, uri, logger):
"""
vsid: video source id that current image frame lable with
uri: video source uri
"""
self._logger = logger
self.vsid = vsid
self.uri = uri
self._capture = cv2.VideoCapture(uri)
self._grabbed = False
self._frame = None
self._started = False
self._stopping = False
def isRunning(self):
"""
return if capture thread is running
"""
return self._started
def start(self):
"""
start capture thread
"""
if self._started:
return self
Thread(target=self._captureLoop, args=()).start()
self._started = True
self._logger.info(f'VideoCapture @{self.vsid}:{self.uri} started')
return self
def stop(self):
"""
stop capture thread
"""
if self._started:
self._logger.info('VideoCapture stopping...')
self._stopping = True
self._started = False
self._grabbed = False
self._capture.release()
self._capture = None
def _captureLoop(self):
while not self._stopping:
(self._grabbed, self._frame) = self._capture.read()
if not self._grabbed:
self.stop()
self._logger.info(f'VideoCapture @{self.vsid}:{self.uri} stopped')
def currentFrame(self):
"""
return current image frame with video source id
"""
return (self._grabbed, self._frame, self.vsid)