From c47d8891a54f8cccef3ba4abd2938fa49c906dd1 Mon Sep 17 00:00:00 2001 From: Xiaolin Charlene Zang Date: Wed, 14 Feb 2018 15:29:46 -0500 Subject: [PATCH] worker's job exec (copyin/run/copyout) logic flow cleanup complete. --- worker.py | 68 ++++++++++++++++++++----------------------------------- 1 file changed, 25 insertions(+), 43 deletions(-) diff --git a/worker.py b/worker.py index c23727d7..53db8a0d 100644 --- a/worker.py +++ b/worker.py @@ -120,8 +120,9 @@ def catFiles(self, f1, f2): try: with open(f2, "rb") as f2fd: shutil.copyfileobj(f2fd, wf) - except OSError: - pass + except IOError: + wf.write("NO OUTPUT FILE\n") + wf.close() os.rename(tmpname, f2) os.remove(f1) @@ -143,8 +144,8 @@ def notifyServer(self, job): except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) - - def afterJob(hdrfile, msg, stage, rc, vmHandling): + def afterJobExecution(self, hdrfile, msg, vmHandling): + (returnVM, replaceVM) = vmHandling self.jobQueue.makeDead(self.job.id, msg) # Update the text that users see in the autodriver output file @@ -205,6 +206,7 @@ def run(self): self.jobLogAndTrace("assigned VM (just initialized)", self.job.vm) vm = self.job.vm + (returnVM, replaceVM) = (True, False) # Wait for the instance to be ready self.jobLogAndTrace("waiting for VM", vm) @@ -229,48 +231,35 @@ def run(self): # Copy input files to VM self.jobLogAndTrace("copying to VM", vm) ret["copyin"] = self.vmms.copyIn(vm, self.job.input) + self.jobLogAndTrace("copying to VM", vm, ret["copyin"]) if ret["copyin"] != 0: Config.copyin_errors += 1 - self.jobLogAndTrace("copying to VM", vm, ret["copyin"]) + msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) + self.afterJobExecution(hdrfile, msg, (returnVM, replaceVM)) + return # Run the job on the virtual machine self.jobLogAndTrace("running on VM", vm) ret["runjob"] = self.vmms.runJob( vm, self.job.timeout, self.job.maxOutputFileSize) self.jobLogAndTrace("running on VM", vm, ret["runjob"]) - if ret["runjob"] != 0: - Config.runjob_errors += 1 - if ret["runjob"] == -1: - Config.runjob_timeouts += 1 + # runjob may have failed. but go on with copyout to get the output if any - # Copy the output back. + # Copy the output back, even if runjob has failed self.jobLogAndTrace("copying from VM", vm) ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) self.jobLogAndTrace("copying from VM", vm, ret["copyout"]) - if ret["copyout"] != 0: - Config.copyout_errors += 1 - - # Job termination. Notice that Tango considers - # things like runjob timeouts and makefile errors to be - # normal termination and doesn't reschedule the job. - self.log.info("Success: job %s:%d finished" % - (self.job.name, self.job.id)) - # Move the job from the live queue to the dead queue - # with an explanatory message - msg = "Success: Autodriver returned normally" - (returnVM, replaceVM) = (True, False) - if ret["copyin"] != 0: - msg = "Error: Copy in to VM failed (status=%d)" % ( - ret["copyin"]) - elif ret["runjob"] != 0: + # handle failure(s) of runjob and/or copyout. runjob error takes priority. + if ret["runjob"] != 0: + Config.runjob_errors += 1 if ret["runjob"] == 1: # This should never happen - msg = "Error: Autodriver usage error (status=%d)" % ( - ret["runjob"]) - elif ret["runjob"] == 2: - msg = "Error: Job timed out after %d seconds" % ( + msg = "Error: Autodriver usage error" + elif ret["runjob"] == -1 or ret["runjob"] == 2: # both are timeouts + Config.runjob_timeouts += 1 + msg = "Error: Job timed out. timeout setting: %d seconds" % ( self.job.timeout) - elif (ret["runjob"] == 3): # EXIT_OSERROR in Autodriver + elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver # Abnormal job termination (Autodriver encountered an OS # error). Assume that the VM is damaged. Destroy this VM # and do not retry the job since the job may have damaged @@ -284,20 +273,13 @@ def run(self): else: # This should never happen msg = "Error: Unknown autodriver error (status=%d)" % ( ret["runjob"]) - elif ret["copyout"] != 0: - msg += "Error: Copy out from VM failed (status=%d)" % ( - ret["copyout"]) - - self.jobQueue.makeDead(self.job.id, msg) - - # Update the text that users see in the autodriver output file - self.appendMsg(hdrfile, msg) - self.catFiles(hdrfile, self.job.outputFile) + Config.copyout_errors += 1 + msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) + else: + msg = "Success: Autodriver returned normally" - # Thread exit after termination - self.detachVM(return_vm=returnVM, replace_vm=replaceVM) - self.notifyServer(self.job) + self.afterJobExecution(hdrfile, msg, (returnVM, replaceVM)) return #