Skip to content

Commit

Permalink
worker's job exec (copyin/run/copyout) logic flow cleanup complete.
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiaolin Charlene Zang committed Feb 14, 2018
1 parent caac9b4 commit c47d889
Showing 1 changed file with 25 additions and 43 deletions.
68 changes: 25 additions & 43 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ def catFiles(self, f1, f2):
try:
with open(f2, "rb") as f2fd:
shutil.copyfileobj(f2fd, wf)
except OSError:
pass
except IOError:
wf.write("NO OUTPUT FILE\n")

wf.close()
os.rename(tmpname, f2)
os.remove(f1)
Expand All @@ -143,8 +144,8 @@ def notifyServer(self, job):
except Exception as e:
self.log.debug("Error in notifyServer: %s" % str(e))


def afterJob(hdrfile, msg, stage, rc, vmHandling):
def afterJobExecution(self, hdrfile, msg, vmHandling):
(returnVM, replaceVM) = vmHandling
self.jobQueue.makeDead(self.job.id, msg)

# Update the text that users see in the autodriver output file
Expand Down Expand Up @@ -205,6 +206,7 @@ def run(self):
self.jobLogAndTrace("assigned VM (just initialized)", self.job.vm)

vm = self.job.vm
(returnVM, replaceVM) = (True, False)

# Wait for the instance to be ready
self.jobLogAndTrace("waiting for VM", vm)
Expand All @@ -229,48 +231,35 @@ def run(self):
# Copy input files to VM
self.jobLogAndTrace("copying to VM", vm)
ret["copyin"] = self.vmms.copyIn(vm, self.job.input)
self.jobLogAndTrace("copying to VM", vm, ret["copyin"])
if ret["copyin"] != 0:
Config.copyin_errors += 1
self.jobLogAndTrace("copying to VM", vm, ret["copyin"])
msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"])
self.afterJobExecution(hdrfile, msg, (returnVM, replaceVM))
return

# Run the job on the virtual machine
self.jobLogAndTrace("running on VM", vm)
ret["runjob"] = self.vmms.runJob(
vm, self.job.timeout, self.job.maxOutputFileSize)
self.jobLogAndTrace("running on VM", vm, ret["runjob"])
if ret["runjob"] != 0:
Config.runjob_errors += 1
if ret["runjob"] == -1:
Config.runjob_timeouts += 1
# runjob may have failed. but go on with copyout to get the output if any

# Copy the output back.
# Copy the output back, even if runjob has failed
self.jobLogAndTrace("copying from VM", vm)
ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile)
self.jobLogAndTrace("copying from VM", vm, ret["copyout"])
if ret["copyout"] != 0:
Config.copyout_errors += 1

# Job termination. Notice that Tango considers
# things like runjob timeouts and makefile errors to be
# normal termination and doesn't reschedule the job.
self.log.info("Success: job %s:%d finished" %
(self.job.name, self.job.id))

# Move the job from the live queue to the dead queue
# with an explanatory message
msg = "Success: Autodriver returned normally"
(returnVM, replaceVM) = (True, False)
if ret["copyin"] != 0:
msg = "Error: Copy in to VM failed (status=%d)" % (
ret["copyin"])
elif ret["runjob"] != 0:
# handle failure(s) of runjob and/or copyout. runjob error takes priority.
if ret["runjob"] != 0:
Config.runjob_errors += 1
if ret["runjob"] == 1: # This should never happen
msg = "Error: Autodriver usage error (status=%d)" % (
ret["runjob"])
elif ret["runjob"] == 2:
msg = "Error: Job timed out after %d seconds" % (
msg = "Error: Autodriver usage error"
elif ret["runjob"] == -1 or ret["runjob"] == 2: # both are timeouts
Config.runjob_timeouts += 1
msg = "Error: Job timed out. timeout setting: %d seconds" % (
self.job.timeout)
elif (ret["runjob"] == 3): # EXIT_OSERROR in Autodriver
elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver
# Abnormal job termination (Autodriver encountered an OS
# error). Assume that the VM is damaged. Destroy this VM
# and do not retry the job since the job may have damaged
Expand All @@ -284,20 +273,13 @@ def run(self):
else: # This should never happen
msg = "Error: Unknown autodriver error (status=%d)" % (
ret["runjob"])

elif ret["copyout"] != 0:
msg += "Error: Copy out from VM failed (status=%d)" % (
ret["copyout"])

self.jobQueue.makeDead(self.job.id, msg)

# Update the text that users see in the autodriver output file
self.appendMsg(hdrfile, msg)
self.catFiles(hdrfile, self.job.outputFile)
Config.copyout_errors += 1
msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"])
else:
msg = "Success: Autodriver returned normally"

# Thread exit after termination
self.detachVM(return_vm=returnVM, replace_vm=replaceVM)
self.notifyServer(self.job)
self.afterJobExecution(hdrfile, msg, (returnVM, replaceVM))
return

#
Expand Down

0 comments on commit c47d889

Please sign in to comment.