Skip to content

Commit

Permalink
Merge pull request #10277 from mapellidario/6-10127-py2py3
Browse files Browse the repository at this point in the history
[py2py3] Migration at level scr/python/A/B/C - slice 6
  • Loading branch information
amaltaro authored Feb 16, 2021
2 parents 55086de + d0ea719 commit 382e7e0
Show file tree
Hide file tree
Showing 24 changed files with 83 additions and 57 deletions.
22 changes: 12 additions & 10 deletions src/python/WMComponent/DBS3Buffer/DBSBufferUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"""
from __future__ import print_function

from future.utils import viewitems

import threading
from collections import defaultdict

Expand Down Expand Up @@ -53,14 +55,14 @@ def loadDBSBufferFilesBulk(self, fileObjs):
dbsFiles.append(dbsfile)

for dbsfile in dbsFiles:
if 'runInfo' in dbsfile.keys():
if 'runInfo' in dbsfile:
# Then we have to replace it with a real run
for r in dbsfile['runInfo'].keys():
for r in dbsfile['runInfo']:
run = Run(runNumber=r)
run.extend(dbsfile['runInfo'][r])
dbsfile.addRun(run)
del dbsfile['runInfo']
if 'parentLFNs' in dbsfile.keys():
if 'parentLFNs' in dbsfile:
# Then we have some parents
for lfn in dbsfile['parentLFNs']:
newFile = DBSBufferFile(lfn=lfn)
Expand Down Expand Up @@ -139,14 +141,14 @@ def findUploadableFilesByDAS(self, datasetpath):
dbsFiles.append(dbsfile)

for dbsfile in dbsFiles:
if 'runInfo' in dbsfile.keys():
if 'runInfo' in dbsfile:
# Then we have to replace it with a real run
for r in dbsfile['runInfo'].keys():
for r in dbsfile['runInfo']:
run = Run(runNumber=r)
run.extendLumis(dbsfile['runInfo'][r])
dbsfile.addRun(run)
del dbsfile['runInfo']
if 'parentLFNs' in dbsfile.keys():
if 'parentLFNs' in dbsfile:
# Then we have some parents
for lfn in dbsfile['parentLFNs']:
newFile = DBSBufferFile(lfn=lfn)
Expand All @@ -173,14 +175,14 @@ def loadFilesByBlock(self, blockname):
dbsFiles.append(dbsfile)

for dbsfile in dbsFiles:
if 'runInfo' in dbsfile.keys():
if 'runInfo' in dbsfile:
# Then we have to replace it with a real run
for r in dbsfile['runInfo'].keys():
for r in dbsfile['runInfo']:
run = Run(runNumber=r)
run.extendLumis(dbsfile['runInfo'][r])
dbsfile.addRun(run)
del dbsfile['runInfo']
if 'parentLFNs' in dbsfile.keys():
if 'parentLFNs' in dbsfile:
# Then we have some parents
for lfn in dbsfile['parentLFNs']:
newFile = DBSBufferFile(lfn=lfn)
Expand Down Expand Up @@ -220,7 +222,7 @@ def summaryPhEDExDBSStatus(self, data):
returns dictionary with kew as workflow and containing dbs/phedex upload status
"""
summary = defaultdict(dict)
for workflow, value in data.iteritems():
for workflow, value in viewitems(data):
# only getting completed workflows
summary[workflow]["Completed"] = True

Expand Down
5 changes: 4 additions & 1 deletion src/python/WMCore/ACDC/CouchFileset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
Created by Dave Evans on 2010-03-19.
Copyright (c) 2010 Fermilab. All rights reserved.
"""

from future.utils import viewvalues

import time

import WMCore.Database.CMSCouch as CMSCouch
Expand Down Expand Up @@ -185,7 +188,7 @@ def listFiles(self):
raise RuntimeError(msg)

files = doc["files"]
for d in files.values():
for d in viewvalues(files):
yield d

@connectToCouch
Expand Down
1 change: 1 addition & 0 deletions src/python/WMCore/ACDC/CouchService.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Created by Dave Evans on 2010-04-20.
Copyright (c) 2010 Fermilab. All rights reserved.
"""
from builtins import object
from time import time

import WMCore.Database.CouchUtils as CouchUtils
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/Database/MySQLCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def substitute(self, origSQL, origBindsList):
# variables: RELEASE_VERSION and RELEASE_VERSION_ID the former will
# match against the latter, causing problems. We'll sort the variable
# names by length to guard against this.
bindVarNames = origBind.keys()
bindVarNames = list(origBind)
bindVarNames.sort(stringLengthCompare)

bindPositions = {}
Expand Down
14 changes: 8 additions & 6 deletions src/python/WMCore/JobSplitting/JobFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""

from builtins import range

import logging
import threading

Expand Down Expand Up @@ -88,13 +90,13 @@ def __call__(self, jobtype="Job", grouptype="JobGroup", *args, **kwargs):
module = __import__(module, globals(), locals(), [grouptype])
self.groupInstance = getattr(module, grouptype.split('.')[-1])

list(map(lambda x: x.start(), self.generators))
list([x.start() for x in self.generators])

self.limit = int(kwargs.get("file_load_limit", self.limit))
self.algorithm(*args, **kwargs)
self.commit()

list(map(lambda x: x.finish(), self.generators))
list([x.finish() for x in self.generators])
return self.jobGroups

def algorithm(self, *args, **kwargs):
Expand All @@ -115,7 +117,7 @@ def newGroup(self):
"""
self.appendJobGroup()
self.currentGroup = self.groupInstance(subscription=self.subscription)
list(map(lambda x: x.startGroup(self.currentGroup), self.generators))
list([x.startGroup(self.currentGroup) for x in self.generators])
return

def newJob(self, name=None, files=None, failedJob=False, failedReason=None):
Expand Down Expand Up @@ -157,7 +159,7 @@ def appendJobGroup(self):
"""

if self.currentGroup:
list(map(lambda x: x.finishGroup(self.currentGroup), self.generators))
list([x.finishGroup(self.currentGroup) for x in self.generators])
if self.currentGroup:
self.jobGroups.append(self.currentGroup)
self.currentGroup = None
Expand Down Expand Up @@ -328,7 +330,7 @@ def loadFiles(self, size=10):
if isinstance(resultProxy.keys, list):
keys = resultProxy.keys
else:
keys = resultProxy.keys()
keys = resultProxy.keys() # do not futurize this!
if isinstance(keys, set):
# If it's a set, handle it
keys = list(keys)
Expand Down Expand Up @@ -463,7 +465,7 @@ def getFilesSortedByLocation(self, eventsPerJob):

if self.checkForAmountOfWork():
# first, check whether we have enough files to reach the desired events_per_job
for sites in lDict.keys():
for sites in list(lDict): # lDict changes size during for loop!
availableEventsPerLocation = sum([f['events'] for f in lDict[sites]])
if eventsPerJob > availableEventsPerLocation:
# then we don't split these files for the moment
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/JobSplitting/MinFileBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def algorithm(self, *args, **kwargs):
#Get a dictionary of sites, files
locationDict = self.sortByLocation()

for location in locationDict.keys():
for location in locationDict:
#Now we have all the files in a certain location
fileList = locationDict[location]
filesInJob = 0
Expand Down
6 changes: 4 additions & 2 deletions src/python/WMCore/JobSplitting/ParentlessMergeBySize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
WMBS merging that ignores file parents.
"""
from __future__ import division

import time
import threading

Expand Down Expand Up @@ -203,11 +205,11 @@ def algorithm(self, *args, **kwargs):

groupedFiles = self.defineFileGroups(mergeableFiles)

for pnn in groupedFiles.keys():
for pnn in groupedFiles:
if self.mergeAcrossRuns:
self.defineMergeJobs(groupedFiles[pnn])
else:
for runNumber in groupedFiles[pnn].keys():
for runNumber in groupedFiles[pnn]:
self.defineMergeJobs(groupedFiles[pnn][runNumber])

return
7 changes: 4 additions & 3 deletions src/python/WMCore/JobSplitting/RunBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@



from builtins import range
from WMCore.JobSplitting.JobFactory import JobFactory
from WMCore.DataStructs.Fileset import Fileset
from WMCore.Services.UUIDLib import makeUUID
Expand Down Expand Up @@ -48,7 +49,7 @@ def algorithm(self, *args, **kwargs):

locationDict = self.sortByLocation()

for location in locationDict.keys():
for location in locationDict:
fileList = locationDict[location]
for f in fileList:

Expand All @@ -69,13 +70,13 @@ def algorithm(self, *args, **kwargs):
run = min(runList)

#If we don't have the run, we need to add it
if not run in runDict.keys():
if run not in runDict:
runDict[run] = []

runDict[run].append(f)


for run in runDict.keys():
for run in runDict:
#Find the runs in the dictionary we assembled and split the files in them

self.newGroup()
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/JobSplitting/SiblingProcessingBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def algorithm(self, *args, **kwargs):

fileSites[completeFile["pnn"]].append(completeFile)

for siteName in fileSites.keys():
for siteName in fileSites:
if len(fileSites[siteName]) < filesPerJob and not filesetClosed:
continue

Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/JobSplitting/SizeBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def algorithm(self, *args, **kwargs):
sizePerJob = int(kwargs.get("size_per_job", 1000))
locationDict = self.sortByLocation()

for location in locationDict.keys():
for location in locationDict:
self.newGroup()
fileList = locationDict[location]
self.newJob(name = makeUUID())
Expand Down
4 changes: 2 additions & 2 deletions src/python/WMCore/JobSplitting/SplitFileBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def defineMergeUnits(self, mergeableFiles):

for mergeableFile in mergeableFiles:
newMergeFile = {}
for key in mergeableFile.keys():
for key in mergeableFile:
newMergeFile[key] = mergeableFile[key]

if newMergeFile["file_run"] not in mergeUnits:
Expand Down Expand Up @@ -170,7 +170,7 @@ def algorithm(self, *args, **kwargs):
mergeableFiles = mergeDAO.execute(self.subscription["id"])

mergeUnits = self.defineMergeUnits(mergeableFiles)
for runNumber in mergeUnits.keys():
for runNumber in mergeUnits:
mergeUnits[runNumber].sort(mergeUnitCompare)
self.createProcJobs(mergeUnits[runNumber])

Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/JobSplitting/TwoFileBased.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def algorithm(self, *args, **kwargs):
#Get a dictionary of sites, files
locationDict = self.sortByLocation()

for location in locationDict.keys():
for location in locationDict:
#Now we have all the files in a certain location
fileList = locationDict[location]
filesInJob = 0
Expand Down
6 changes: 3 additions & 3 deletions src/python/WMCore/JobSplitting/WMBSMergeBySize.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def defineMergeUnits(self, mergeableFiles):
for mergeableFile in mergeableFiles:
newMergeFile = {}

for key in mergeableFile.keys():
for key in mergeableFile:
newMergeFile[key] = mergeableFile[key]

if newMergeFile["pnn"] not in mergeUnits:
Expand Down Expand Up @@ -243,8 +243,8 @@ def algorithm(self, *args, **kwargs):

mergeUnits = self.defineMergeUnits(mergeableFiles)

for pnn in mergeUnits.keys():
for runNumber in mergeUnits[pnn].keys():
for pnn in mergeUnits:
for runNumber in mergeUnits[pnn]:
self.defineMergeJobs(mergeUnits[pnn][runNumber])

return
7 changes: 5 additions & 2 deletions src/python/WMCore/Storage/DeleteMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"""
from __future__ import print_function

from builtins import object
from future.utils import viewitems

import logging

from WMCore.Storage.Registry import retrieveStageOutImpl
Expand All @@ -34,7 +37,7 @@ def __init__(self, message, **data):
self.data.setdefault("ErrorType", self.__class__.__name__)


class DeleteMgr:
class DeleteMgr(object):
"""
_DeleteMgr_
Expand Down Expand Up @@ -150,7 +153,7 @@ def initialiseOverride(self):
overrideParams['option'] = ""

msg = "=======Delete Override Initialised:================\n"
for key, val in overrideParams.items():
for key, val in viewitems(overrideParams):
msg += " %s : %s\n" % (key, val)
msg += "=====================================================\n"
self.logger.info(msg)
Expand Down
11 changes: 6 additions & 5 deletions src/python/WMCore/WMBS/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from __future__ import print_function

from builtins import int, str, bytes

from WMCore.DataStructs.Job import Job as WMJob
from WMCore.DataStructs.Mask import Mask as WMMask
from WMCore.Services.UUIDLib import makeUUID
Expand Down Expand Up @@ -423,7 +425,7 @@ def setFWJRPath(self, fwjrPath=None):
"""

if not fwjrPath:
if 'fwjr' in self.keys():
if 'fwjr' in self:
fwjrPath = self['fwjr']
else:
return None
Expand All @@ -443,16 +445,15 @@ def getDataStructsJob(self):
job = WMJob(name=self['name'])

# Transfer all simple keys
for key in self.keys():
keyType = type(self.get(key))
if keyType in [str, long, int, float]:
for key in self:
if isinstance(self.get(key), (str, bytes, int, float)):
job[key] = self[key]

for fileObj in self['input_files']:
job['input_files'].append(fileObj.returnDataStructsFile())

job['mask'] = WMMask()
for key in self["mask"].keys():
for key in self["mask"]:
job["mask"][key] = self["mask"][key]

job.baggage = self.baggage
Expand Down
Loading

0 comments on commit 382e7e0

Please sign in to comment.