From 93e60ada803514d4164237f5043bee95671259aa Mon Sep 17 00:00:00 2001 From: Xiaolin Charlene Zang Date: Wed, 23 Aug 2017 20:10:56 +0000 Subject: [PATCH] Modified pool allocation logic to 1) not to allocate all vms allowed by pool size at once and 2) consider vms in free pool first. --- config.template.py | 5 +++++ jobManager.py | 4 +++- jobQueue.py | 11 +++++++---- preallocator.py | 24 ++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/config.template.py b/config.template.py index a1f1b902..17bbf6fd 100644 --- a/config.template.py +++ b/config.template.py @@ -57,6 +57,8 @@ class Config: NUM_THREADS = 20 # We have the option to reuse VMs or discard them after each use + # xxxXXX??? strongly suspect the code path for the False case + # not working, after a failed experiment. REUSE_VMS = True # Worker waits this many seconds for functions waitvm, copyin (per @@ -106,6 +108,9 @@ class Config: # Default vm pool size POOL_SIZE = 2 + # Default increment step when enlarging vm pool + POOL_ALLOC_INCREMENT = 2 + # Optionally log finer-grained timing information LOG_TIMING = False diff --git a/jobManager.py b/jobManager.py index 968837c0..0f6c11d1 100644 --- a/jobManager.py +++ b/jobManager.py @@ -94,8 +94,10 @@ def __manage(self): # the worker if successful. if Config.REUSE_VMS: preVM = vm - self.log.info("_manage reuse vm %s" % preVM.id) + self.log.info("_manage use vm %s" % preVM.id) else: + # xxxXXX??? strongly suspect this code path not work. + # After setting REUSE_VMS to False, job submissions don't run. preVM = self.preallocator.allocVM(job.vm.name) self.log.info("_manage allocate vm %s" % preVM.id) vmms = self.vmms[job.vm.vmms] # Create new vmms object diff --git a/jobQueue.py b/jobQueue.py index 3b53c1cd..9128ae86 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -207,16 +207,19 @@ def getNextPendingJobReuse(self, target_id=None): # if target_id is set, only interested in this id if target_id and target_id != id: continue - # Create a pool if necessary - if self.preallocator.poolSize(job.vm.name) == 0: - self.preallocator.update(job.vm, Config.POOL_SIZE) + + # Create or enlarge a pool if there is no free vm to use and + # the limit for pool is not reached yet + if self.preallocator.freePoolSize(job.vm.name) == 0 and \ + self.preallocator.poolSize(job.vm.name) < Config.POOL_SIZE: + self.preallocator.incrementPoolSize(job.vm, Config.POOL_ALLOC_INCREMENT) # If the job hasn't been assigned to a worker yet, see if there # is a free VM if (job.isNotAssigned()): vm = self.preallocator.allocVM(job.vm.name) - self.log.info("getNextPendingJobReuse alloc vm %s for %s" % (id, vm)) if vm: + self.log.info("getNextPendingJobReuse alloc vm %s to job %s" % (vm, id)) self.queueLock.release() return (id, vm) diff --git a/preallocator.py b/preallocator.py index 0f3270ba..f4421a7b 100644 --- a/preallocator.py +++ b/preallocator.py @@ -34,6 +34,30 @@ def poolSize(self, vmName): else: return len(self.machines.get(vmName)[0]) + def freePoolSize(self, vmName): + """ freePoolSize - returns the size of the vmName free pool, for external callers + """ + if vmName in self.machines.keys(): + return self.machines.get(vmName)[1].qsize() + else: + return 0 + + def incrementPoolSize(self, vm, delta): + """ + Called by jobQueue to create the pool and allcoate given number of vms + """ + + self.lock.acquire() + if vm.name not in self.machines.keys(): + self.machines.set(vm.name, [[], TangoQueue(vm.name)]) + # see comments in jobManager.py for the same call + self.machines.get(vm.name)[1].make_empty() + self.log.debug("Creating empty pool of %s instances" % (vm.name)) + self.lock.release() + + self.log.debug("incrementPoolSize: add %d new %s instances" % (delta, vm.name)) + threading.Thread(target=self.__create(vm, delta)).start() + def update(self, vm, num): """ update - Updates the number of machines of a certain type to be preallocated.