-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
move framesource objects to separate file add jpeg mimetype
- Loading branch information
mvgorcum
committed
Oct 16, 2020
1 parent
c4e9d74
commit 5c8b2a8
Showing
3 changed files
with
247 additions
and
190 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Framesupply classes used for reading out various formats and cameras. | ||
""" | ||
|
||
import threading | ||
import cv2 | ||
import imageio | ||
import numpy as np | ||
from time import sleep | ||
from PyQt5 import QtGui | ||
import datetime | ||
|
||
class FrameSupply: | ||
""" | ||
Main class that can supply frames for further analysis. | ||
""" | ||
|
||
def __init__(self): | ||
self.frameready = False | ||
self.is_running = False | ||
self.gotcapturetime=False | ||
self.framebuffer=[] | ||
self.nframes=0 | ||
self.framenumber=0 | ||
|
||
def run(self): | ||
""" | ||
Start the frame supply. Required to get frames. | ||
""" | ||
pass | ||
|
||
def getnextframe(self): | ||
""" | ||
Get the last frame from the frame supply buffer. | ||
Only possible if frameready is true. | ||
""" | ||
pass | ||
def getframesize(self): | ||
""" | ||
Get the width and height of the frame. | ||
""" | ||
pass | ||
|
||
class OpencvReadVideo(FrameSupply): | ||
""" | ||
Read videofile with OpenCV | ||
""" | ||
def __init__(self,VideoFile): | ||
super().__init__() | ||
self.VideoFile=VideoFile | ||
self.is_running = False | ||
self.gotcapturetime=False | ||
|
||
def start(self): | ||
self.cap = cv2.VideoCapture(self.VideoFile) | ||
self.is_running = True | ||
self.nframes=int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | ||
|
||
def stop(self): | ||
""" | ||
Stop the feed | ||
""" | ||
self.cap.release() | ||
|
||
def getfirstframe(self): | ||
ret, org_frame = self.cap.read() | ||
self.cap.set(cv2.CAP_PROP_POS_FRAMES,0) | ||
return cv2.cvtColor(org_frame, cv2.COLOR_BGR2RGB),0 | ||
|
||
def getnextframe(self): | ||
self.framenumber = self.cap.get(cv2.CAP_PROP_POS_FRAMES) | ||
ret, org_frame = self.cap.read() | ||
if ret: | ||
return cv2.cvtColor(org_frame, cv2.COLOR_BGR2RGB),self.framenumber | ||
else: | ||
self.is_running=False | ||
self.stop() | ||
return -1,-1 | ||
|
||
def getframesize(self): | ||
return self.cap.get(cv2.CAP_PROP_FRAME_WIDTH),self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | ||
|
||
|
||
class ImageReader(FrameSupply): | ||
""" | ||
Read videofile with OpenCV | ||
""" | ||
def __init__(self,ImageFile): | ||
super().__init__() | ||
self.ImageFile=ImageFile | ||
self.is_running = False | ||
self.gotcapturetime=False | ||
self.framenumber=0 | ||
|
||
def start(self): | ||
self.is_running = True | ||
self.IOReader=imageio.get_reader(self.ImageFile) | ||
self.nframes=self.IOReader.get_length() | ||
|
||
def stop(self): | ||
""" | ||
Stop the feed | ||
""" | ||
self.is_running = False | ||
self.IOReader.close() | ||
|
||
def getfirstframe(self): | ||
org_frame=self.IOReader.get_data(0) | ||
return org_frame, 0 | ||
|
||
def getnextframe(self): | ||
if self.framenumber<self.nframes: | ||
org_frame = self.IOReader.get_data(self.framenumber) | ||
self.framenumber+=1 | ||
return org_frame,self.framenumber | ||
else: | ||
return -1,-1 | ||
|
||
def getframesize(self): | ||
org_frame=self.IOReader.get_data(0) | ||
size=org_frame.shape | ||
return size[1],size[0] | ||
|
||
|
||
class OpencvCamera(FrameSupply): | ||
""" | ||
Camera operated using OpenCV | ||
""" | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.framecaptime = [] | ||
self.imaging_thread = [] | ||
self.keep_running = False | ||
self.is_running = False | ||
self.gotcapturetime=True | ||
|
||
def start(self): | ||
""" | ||
Start the camera | ||
""" | ||
self.keep_running = True | ||
self.imaging_thread = threading.Thread(target=self._aquire) | ||
self.imaging_thread.start() | ||
|
||
def stop(self): | ||
""" | ||
Stop the camera | ||
""" | ||
self.keep_running = False | ||
|
||
|
||
def getnextframe(self): | ||
""" | ||
Get the last frame | ||
:return: | ||
""" | ||
self.nframes=len(self.framebuffer) | ||
if len(self.framebuffer)>=1: | ||
|
||
return self.framebuffer.pop(0),self.framecaptime.pop(0) | ||
else: | ||
return -1,-1 | ||
|
||
def getframesize(self): | ||
if not 'self.cap' in locals(): | ||
sleep(0.5) | ||
return self.cap.get(cv2.CAP_PROP_FRAME_WIDTH),self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | ||
else: | ||
return self.cap.get(cv2.CAP_PROP_FRAME_WIDTH),self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | ||
|
||
def _aquire(self): | ||
if self.is_running: | ||
print("already running") | ||
return | ||
self.is_running = True | ||
self.cap = cv2.VideoCapture(0) | ||
if not self.cap.isOpened(): | ||
errorpopup=QtGui.QMessageBox() | ||
errorpopup.setText('Error opening video stream') | ||
errorpopup.setStandardButtons(QtGui.QMessageBox.Ok) | ||
errorpopup.exec_() | ||
self.cap.release() | ||
self.is_running = False | ||
self.keep_running = False | ||
while self.keep_running: | ||
ret, org_frame = self.cap.read() | ||
self.framebuffer.append(cv2.cvtColor(org_frame, cv2.COLOR_BGR2RGB)) | ||
self.framecaptime.append(np.datetime64(datetime.datetime.now())) | ||
self.frameready = True | ||
self.cap.release() | ||
self.is_running = False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Created on Tue Dec 20 16:20:35 2016 | ||
@author: M.van.Gorcum | ||
""" | ||
import numpy as np | ||
import scipy as sp | ||
|
||
def linear_subpixel_detection(image,thresh): | ||
framesize=image.shape | ||
edgeleft=np.zeros(framesize[0]) | ||
edgeright=np.zeros(framesize[0]) | ||
for y in range(framesize[0]-1): #edge detection, go line by line on horizontal | ||
edgeleft[y]=np.argmax(image[y,0:framesize[1]]<thresh) #edge detection on pixel scale | ||
if edgeleft[y]!=0: | ||
subpxcorr=(thresh-np.float_(image[y,np.int(edgeleft[y]-1)]))/(np.float_(image[y,np.int(edgeleft[y])])-np.float_(image[y,np.int(edgeleft[y]-1)])) #subpixel correction with using corr=(threshold-intensity(edge-1))/(intensity(edge)-intensity(edge-1)) | ||
edgeleft[y]=edgeleft[y]+subpxcorr-1 #add the correction and shift 1 pixel left, to plot on the edge properly | ||
edgeright[y]=np.int(framesize[1]-np.argmax(image[y,range(framesize[1]-1,0,-1)]<thresh)) | ||
#same scheme for right edge, except the edge detection is done flipped, since np.argmax gives the first instance of the maximum value | ||
if edgeright[y]!=framesize[1]: | ||
subpxcorr=(thresh-np.float_(image[y,np.int(edgeright[y]-1)]))/(np.float_(image[y,np.int(edgeright[y])])-np.float_(image[y,np.int(edgeright[y]-1)])) | ||
edgeright[y]=edgeright[y]+subpxcorr-1 | ||
return edgeleft, edgeright; | ||
|
||
|
||
def errorfunction_subpixel_detection(image,thresh): | ||
erffitsize=np.int(40) | ||
def errorfunction(x,xdata,y): #define errorfunction to fit with a least squares fit. | ||
return x[0]*(1+sp.special.erf(xdata*x[1]+x[2]))+x[3] - y; | ||
framesize=image.shape | ||
edgeleft=np.zeros(framesize[0]) | ||
edgeright=np.zeros(framesize[0]) | ||
for y in range(framesize[0]-1): #edge detection, go line by line on horizontal | ||
edgeleft[y]=np.argmax(image[y,0:framesize[1]]<thresh) #edge detection on pixel scale | ||
if (edgeleft[y]-erffitsize)>=0 and (edgeleft[y]-erffitsize)<=framesize[0]: | ||
fitparts=np.array(image[y,range(np.int(edgeleft[y])-erffitsize,np.int(edgeleft[y])+erffitsize)]) #take out part of the image around the edge to fit the error function | ||
guess=(max(fitparts)-min(fitparts))/2,-.22,0,min(fitparts) #initial guess for error function | ||
lstsqrsol=sp.optimize.least_squares(errorfunction,guess,args=(np.array(range(-erffitsize,erffitsize)),fitparts)) #least sqaures fit | ||
edgeleft[y]=edgeleft[y]-lstsqrsol.x[2]/lstsqrsol.x[1] #add the subpixel correction | ||
edgeright[y]=np.int(framesize[1]-np.argmax(image[y,range(framesize[1]-1,0,-1)]<thresh))#same scheme for right edge, except the edge detection is done flipped, since np.argmax gives the first instance of the maximum value | ||
if (edgeright[y]-erffitsize)>=0 and (edgeright[y]-erffitsize)<=framesize[0]: | ||
fitparts=np.array(image[y,range(np.int(edgeright[y])-erffitsize,np.int(edgeright[y])+erffitsize)]) | ||
guess=(max(fitparts)-min(fitparts))/2,.22,0,min(fitparts) | ||
lstsqrsol=sp.optimize.least_squares(errorfunction,guess,args=(np.array(range(-erffitsize,erffitsize)),fitparts)) | ||
edgeright[y]=edgeright[y]-lstsqrsol.x[2]/lstsqrsol.x[1] | ||
elif edgeright[y]==framesize[1]: | ||
edgeright[y]=0 | ||
|
||
return edgeleft, edgeright; |
Oops, something went wrong.