Skip to content

Commit

Permalink
Rename var. Add str as supported type on site id.
Browse files Browse the repository at this point in the history
  • Loading branch information
bastiencyr committed Sep 14, 2021
1 parent 996beb5 commit 54e2332
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/vorta/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from vorta.notifications import VortaNotifications

logger = logging.getLogger(__name__)
DEBUG = True
DEBUG = False


# TODO: refactor to use QtCore.QTimer directly
Expand Down Expand Up @@ -317,16 +317,16 @@ def __init__(self, site_id, nb_workers_running: QtCore.QSemaphore, threadpool):
self.current_job = None
self.nb_workers_running = nb_workers_running
self.threadpool = threadpool
self.start_site = QtCore.QMutex()
self.mut_start_site = QtCore.QMutex()

def add_job(self, task: Job):
self.__p_queue.put(task)
# If the site is dead, run the site again
self.start_site.lock()
self.mut_start_site.lock()
if self.worker_is_running is False:
self.threadpool.start(self)
self.worker_is_running = True
self.start_site.unlock()
self.mut_start_site.unlock()
# TODO This function must add the job to the database
# self.add_to_db(job)

Expand All @@ -337,7 +337,7 @@ def cancel_all_jobs(self):
end_job = Job()
end_job.set_status(JobStatus.CANCEL)

self.start_site.lock()
self.mut_start_site.lock()
# Stop the loop
self.worker_is_running = False
# if the site is waiting for a Job, send a cancel Job
Expand All @@ -346,7 +346,7 @@ def cancel_all_jobs(self):
if self.current_job is not None:
self.current_job.cancel()

self.start_site.unlock()
self.mut_start_site.unlock()

def cancel_job(self, job: Job):
# Dequeue the job
Expand Down Expand Up @@ -411,18 +411,18 @@ def __init__(self):
self.load_from_db()
# use a threadpool -> This could be changed in the future
self.threadpool = QThreadPool()
self.lock_queues = QtCore.QMutex()
self.mut_queues = QtCore.QMutex()

def test_get_site(self, site):
return self.__queues.get(site)

def get_site(self, site):
self.lock_queues.lock()
self.mut_queues.lock()
if site not in self.__queues:
if DEBUG:
print("Create a site ", site)
self.__queues[site] = _Queue(site, JobsManager.nb_workers_running, self.threadpool)
self.lock_queues.unlock()
self.mut_queues.unlock()
return self.__queues.get(site)

def load_from_db(self):
Expand All @@ -434,23 +434,28 @@ def add_job(self, job: Job):
if DEBUG:
print("Add Job on site ", job.get_site_id(), type(job.get_site_id()))

if type(job.get_site_id()) is not int:
if type(job.get_site_id()) is not (int or str):
print("get_site_id must return an integer. A ", type(job.get_site_id()), " has be returned.")
return 1
self.get_site(job.get_site_id()).add_job(job)

# Ask to all queues to cancel all jobs. This is what the user expects when he presses the cancel button.
"""
Ask to all queues to cancel all jobs.
There is no guarantee that all tasks will be removed. If some jobs are added before lock from another thread,
it will not be cancelled. That's why, it's strongly advised to add jobs from the main UI loop.
"""
def cancel_all_jobs(self):
# Lock dict to avoid someone else adding a new site during cancel operation
self.lock_queues.lock()
self.mut_queues.lock()
for id_site, site in self.__queues.items():
# don't use get_site since mut_queue is already locked
site.cancel_all_jobs()
self.__queues.clear()
# reset the semaphore
JobsManager.reset_nb_workers()
if DEBUG:
print("End Cancel")
self.lock_queues.unlock()
self.mut_queues.unlock()

def cancel_job(self, job: Job):
# call cancel job of the site queue
Expand Down

0 comments on commit 54e2332

Please sign in to comment.