Skip to content

Commit

Permalink
Make dryrun work again (#8767)
Browse files Browse the repository at this point in the history
* add support for dryrun via preparelocal

* introduce Uploader action after DagmanCreator
  • Loading branch information
belforte authored Nov 7, 2024
1 parent 483adb2 commit 97f4477
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 150 deletions.
3 changes: 2 additions & 1 deletion src/python/CRABInterface/RESTTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ def webdirprx(self, **kwargs):
#extract /cms1425/taskname from the user webdir
suffix = re.search(r"(/[^/]+/[^/]+/?)$", row.user_webdir).group(0)
else:
raise ExecutionError("Webdir not set in the database. Cannot build proxied webdir")
yield "None"
return

#=============================================================================
# scheddObj is a dictionary composed like this (see the value of htcondorSchedds):
Expand Down
2 changes: 2 additions & 0 deletions src/python/CRABUtils/TaskUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def updateTaskStatus(crabserver=None, taskName=None, status=None, logger=None):
command = 'SUBMIT'
elif status == 'NEW':
command = 'SUBMIT'
elif status == 'UPLOADED':
command = 'SUBMIT'
elif status == 'SUBMITREFUSED':
command = 'SUBMIT'
elif status == 'KILLED':
Expand Down
19 changes: 12 additions & 7 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from ast import literal_eval

from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME
from ServerUtilities import getLock, checkS3Object, uploadToS3
from ServerUtilities import getLock, checkS3Object

import TaskWorker.DataObjects.Result
from TaskWorker.Actions.TaskAction import TaskAction
from TaskWorker.Actions.Splitter import SplittingSummary
from TaskWorker.WorkerExceptions import TaskWorkerException, SubmissionRefusedException
from RucioUtils import getWritePFN
from CMSGroupMapper import get_egroup_users
Expand Down Expand Up @@ -728,14 +729,9 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags):
with tarfile.open('InputFiles.tar.gz', mode='w:gz') as tf:
for ifname in inputFiles + subdags + ['input_args.json']:
tf.add(ifname)
# also upload InputFiles.tar.gz to s3
task = kw['task']['tm_taskname']
uploadToS3(crabserver=self.crabserver, filepath='InputFiles.tar.gz',
objecttype='runtimefiles', taskname=task,
logger=self.logger)

def createSubdag(self, splitterResult, **kwargs):
""" main DAG is also dealt with here """
""" beware the "Sub" in the name ! This is used also for Main DAG """

startjobid = kwargs.get('startjobid', 0)
parent = kwargs.get('parent', None)
Expand Down Expand Up @@ -1198,6 +1194,15 @@ def executeInternal(self, *args, **kw):

info, splitterResult, subdags, dagSpecs = self.createSubdag(*args, **kw)

# as splitter summary is useful for dryrun, let's add it to the InputFiles tarball
jobGroups = splitterResult[0] # the first returned value of Splitter action is the splitterFactory output
splittingSummary = SplittingSummary(kw['task']['tm_split_algo'])
for jobgroup in jobGroups:
jobs = jobgroup.getJobs()
splittingSummary.addJobs(jobs)
splittingSummary.dump('splitting-summary.json')
inputFiles.append('splitting-summary.json')

self.prepareLocal(dagSpecs, info, kw, inputFiles, subdags)

return info, params, ["InputFiles.tar.gz"], splitterResult
Expand Down
140 changes: 0 additions & 140 deletions src/python/TaskWorker/Actions/DryRunUploader.py

This file was deleted.

6 changes: 4 additions & 2 deletions src/python/TaskWorker/Actions/Handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
from TaskWorker.Actions.MyProxyLogon import MyProxyLogon
from TaskWorker.Actions.DagmanCreator import DagmanCreator
from TaskWorker.Actions.StageoutCheck import StageoutCheck
from TaskWorker.Actions.DryRunUploader import DryRunUploader
from TaskWorker.Actions.MakeFakeFileSet import MakeFakeFileSet
from TaskWorker.Actions.DagmanSubmitter import DagmanSubmitter
from TaskWorker.Actions.DBSDataDiscovery import DBSDataDiscovery
from TaskWorker.Actions.Uploader import Uploader
from TaskWorker.Actions.UserDataDiscovery import UserDataDiscovery
from TaskWorker.Actions.RucioDataDiscovery import RucioDataDiscovery
from TaskWorker.Actions.DagmanResubmitter import DagmanResubmitter
Expand Down Expand Up @@ -200,8 +200,10 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs):
handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum))
handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum))
handler.addWork(DagmanCreator(config=config, crabserver=crabserver, procnum=procnum, rucioClient=rucioClient))
handler.addWork(Uploader(config=config, crabserver=crabserver, procnum=procnum))
if task['tm_dry_run'] == 'T':
handler.addWork(DryRunUploader(config=config, crabserver=crabserver, procnum=procnum))
# stop here and wait for user to be satisfied with what's been uploaded
pass
else:
handler.addWork(DagmanSubmitter(config=config, crabserver=crabserver, procnum=procnum))

Expand Down
63 changes: 63 additions & 0 deletions src/python/TaskWorker/Actions/Splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
Split a task request into a set of jobs
"""
# pylint: disable=too-many-branches

import json

from WMCore.DataStructs.Workflow import Workflow
from WMCore.DataStructs.Subscription import Subscription
from WMCore.DataStructs.LumiList import LumiList
from WMCore.JobSplitting.SplitterFactory import SplitterFactory

from TaskWorker.DataObjects.Result import Result
Expand Down Expand Up @@ -129,6 +133,65 @@ def diagnoseRunMatch(runs=None, data=None):
return msg


class SplittingSummary():
"""
Class which calculates some summary data about the splitting results.
"""

def __init__(self, algo):
self.algo = algo
self.lumisPerJob = []
self.eventsPerJob = []
self.filesPerJob = []

def addJobs(self, jobs):
"""
populate the summary, argument is a job created by the splitter factory
i.e. it is meant to be used like this:
for jobgroup in factory:
jobs = jobgroup.getJobs()
splittingSummary.addJobs(jobs)
where factory is the first return argument of the execute method in the above Splitter class
"""
if self.algo == 'FileBased':
self.lumisPerJob += [sum([x.get('lumiCount', 0) for x in job['input_files']]) for job in jobs]
self.eventsPerJob += [sum([x['events'] for x in job['input_files']]) for job in jobs]
self.filesPerJob += [len(job['input_files']) for job in jobs]
elif self.algo == 'EventBased':
self.lumisPerJob += [job['mask']['LastLumi'] - job['mask']['FirstLumi'] for job in jobs]
self.eventsPerJob += [job['mask']['LastEvent'] - job['mask']['FirstEvent'] for job in jobs]
else:
for job in jobs:
avgEventsPerLumi = sum([f['avgEvtsPerLumi'] for f in job['input_files']])/float(len(job['input_files']))
lumis = LumiList(compactList=job['mask']['runAndLumis'])
self.lumisPerJob.append(len(lumis.getLumis()))
self.eventsPerJob.append(avgEventsPerLumi * self.lumisPerJob[-1])

def dump(self, outname):
"""
Save splitting summary to a json file.
"""

summary = {'algo': self.algo,
'total_jobs': len(self.lumisPerJob),
'total_lumis': sum(self.lumisPerJob),
'total_events': sum(self.eventsPerJob),
'max_lumis': max(self.lumisPerJob),
'max_events': max(self.eventsPerJob),
'avg_lumis': sum(self.lumisPerJob)/float(len(self.lumisPerJob)),
'avg_events': sum(self.eventsPerJob)/float(len(self.eventsPerJob)),
'min_lumis': min(self.lumisPerJob),
'min_events': min(self.eventsPerJob)}
if len(self.filesPerJob) > 0:
summary.update({'total_files': sum(self.filesPerJob),
'max_files': max(self.filesPerJob),
'avg_files': sum(self.filesPerJob)/float(len(self.filesPerJob)),
'min_files': min(self.filesPerJob)})

with open(outname, 'w', encoding='utf-8') as f:
json.dump(summary, f)


if __name__ == '__main__':
splitparams = [{'halt_job_on_file_boundaries': False, 'algorithm': 'LumiBased', 'lumis_per_job': 2000, 'splitOnRun': False},
{'halt_job_on_file_boundaries': False, 'algorithm': 'LumiBased', 'lumis_per_job': 50, 'splitOnRun': False},
Expand Down
37 changes: 37 additions & 0 deletions src/python/TaskWorker/Actions/Uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
uploads to S3 the tarball created by DagmanCreator
"""
import os

from TaskWorker.DataObjects.Result import Result
from TaskWorker.Actions.TaskAction import TaskAction
from TaskWorker.WorkerExceptions import TaskWorkerException
from ServerUtilities import uploadToS3
from CRABUtils.TaskUtils import updateTaskStatus


class Uploader(TaskAction):
"""
Upload an archive containing all files needed to run the task to the Cache (necessary for crab submit --dryrun.)
"""
def executeInternal(self, *args, **kw):
""" pload InputFiles.tar.gz to s3 """
task = kw['task']['tm_taskname']
uploadToS3(crabserver=self.crabserver, filepath='InputFiles.tar.gz',
objecttype='runtimefiles', taskname=task,
logger=self.logger)
# report that all tarballs are now in S3
updateTaskStatus(crabserver=self.crabserver, taskName=task, status='UPLOADED', logger=self.logger)
return Result(task=kw['task'], result=args[0])

def execute(self, *args, **kw):
""" entry point from Handler """
cwd = os.getcwd()
try:
os.chdir(kw['tempDir'])
return self.executeInternal(*args, **kw)
except Exception as e:
msg = f"Failed to run Uploader action for {kw['task']['tm_taskname']}; {e}"
raise TaskWorkerException(msg) from e
finally:
os.chdir(cwd)

0 comments on commit 97f4477

Please sign in to comment.