diff --git a/PhysicsTools/Heppy/python/analyzers/core/AutoHandle.py b/PhysicsTools/Heppy/python/analyzers/core/AutoHandle.py index adcfc0de1645b..86cc2b96583e0 100644 --- a/PhysicsTools/Heppy/python/analyzers/core/AutoHandle.py +++ b/PhysicsTools/Heppy/python/analyzers/core/AutoHandle.py @@ -7,22 +7,24 @@ class AutoHandle( Handle, object ): handles = {} - def __init__(self, label, type, mayFail=False, fallbackLabel=None): + def __init__(self, label, type, mayFail=False, fallbackLabel=None, lazy=True): '''Note: label can be a tuple : (module_label, collection_label, process)''' self.label = label self.fallbackLabel = fallbackLabel self.type = type self.mayFail = mayFail + self.lazy = lazy Handle.__init__(self, self.type) def product(self): - if not self.isLoaded : - self.ReallyLoad(self.event) - self.isLoaded=True - return super(AutoHandle,self).product() + if not self.isLoaded : + self.ReallyLoad(self.event) + self.isLoaded=True + return super(AutoHandle,self).product() def Load(self, event): #is actually a reset state - self.event=event - self.isLoaded=False + self.event=event + self.isLoaded=False + if self.lazy==False: self.ReallyLoad(self.event) def ReallyLoad(self, event): '''Load self from a given event. diff --git a/PhysicsTools/Heppy/python/analyzers/gen/GeneratorAnalyzer.py b/PhysicsTools/Heppy/python/analyzers/gen/GeneratorAnalyzer.py index e21f0a14019ab..b5c2fef55b77d 100644 --- a/PhysicsTools/Heppy/python/analyzers/gen/GeneratorAnalyzer.py +++ b/PhysicsTools/Heppy/python/analyzers/gen/GeneratorAnalyzer.py @@ -45,6 +45,9 @@ class GeneratorAnalyzer( Analyzer ): event.genwzquarks and event.genbquarks, might have overlaps event.genbquarksFromTop and event.genbquarksFromH are all contained in event.genbquarks + In addition to genParticles, if makeLHEweights is set to True, the list WeightsInfo objects of the LHE branch + is stored in event.LHE_weights + """ def __init__(self, cfg_ana, cfg_comp, looperName ): @@ -54,10 +57,13 @@ def __init__(self, cfg_ana, cfg_comp, looperName ): self.makeAllGenParticles = cfg_ana.makeAllGenParticles self.makeSplittedGenLists = cfg_ana.makeSplittedGenLists self.allGenTaus = cfg_ana.allGenTaus if self.makeSplittedGenLists else False + self.makeLHEweights = cfg_ana.makeLHEweights def declareHandles(self): super(GeneratorAnalyzer, self).declareHandles() self.mchandles['genParticles'] = AutoHandle( 'prunedGenParticles', 'std::vector' ) + if self.makeLHEweights: + self.mchandles['LHEweights'] = AutoHandle( 'source', 'LHEEventProduct', mayFail = True, lazy = False ) def beginLoop(self,setup): super(GeneratorAnalyzer,self).beginLoop(setup) @@ -239,6 +245,13 @@ def makeMCInfo(self, event): if id <= 5 and any([abs(m.pdgId()) in {23,24} for m in realGenMothers(p)]): event.genwzquarks.append(p) + #Add LHE weight info + event.LHE_weights = [] + if self.makeLHEweights: + if self.mchandles['LHEweights'].isValid(): + for w in self.mchandles['LHEweights'].product().weights(): + event.LHE_weights.append(w) + def process(self, event): self.readCollections( event.input ) @@ -263,6 +276,8 @@ def process(self, event): # Make also the splitted lists makeSplittedGenLists = True, allGenTaus = False, + # Save LHE weights in LHEEventProduct + makeLHEweights = True, # Print out debug information verbose = False, ) diff --git a/PhysicsTools/Heppy/python/analyzers/objects/JetAnalyzer.py b/PhysicsTools/Heppy/python/analyzers/objects/JetAnalyzer.py index 37deb54878555..007e7f6af44d7 100644 --- a/PhysicsTools/Heppy/python/analyzers/objects/JetAnalyzer.py +++ b/PhysicsTools/Heppy/python/analyzers/objects/JetAnalyzer.py @@ -70,7 +70,8 @@ def __init__(self, cfg_ana, cfg_comp, looperName): self.lepSelCut = getattr(self.cfg_ana, 'lepSelCut', lambda lep : True) self.jetGammaDR = getattr(self.cfg_ana, 'jetGammaDR', 0.4) if(self.cfg_ana.doQG): - self.qglcalc = QGLikelihoodCalculator("%s/src/PhysicsTools/Heppy/data/pdfQG_AK4chs_antib_13TeV_v1.root" % os.environ['CMSSW_BASE']) + qgdefname="{CMSSW_BASE}/src/PhysicsTools/Heppy/data/pdfQG_AK4chs_antib_13TeV_v1.root" + self.qglcalc = QGLikelihoodCalculator(getattr(self.cfg_ana,"QGpath",qgdefname).format(CMSSW_BASE= os.environ['CMSSW_BASE'])) if not hasattr(self.cfg_ana ,"collectionPostFix"):self.cfg_ana.collectionPostFix="" def declareHandles(self): diff --git a/PhysicsTools/Heppy/python/analyzers/objects/LeptonAnalyzer.py b/PhysicsTools/Heppy/python/analyzers/objects/LeptonAnalyzer.py index db47c7ab14e71..5e68e3a628fb1 100644 --- a/PhysicsTools/Heppy/python/analyzers/objects/LeptonAnalyzer.py +++ b/PhysicsTools/Heppy/python/analyzers/objects/LeptonAnalyzer.py @@ -335,6 +335,9 @@ def makeAllElectrons(self, event): ele.tightIdResult = ele.electronID("POG_MVA_ID_Trig_full5x5") elif self.cfg_ana.ele_tightId=="Cuts_2012" : ele.tightIdResult = -1 + 1*ele.electronID("POG_Cuts_ID_2012_Veto_full5x5") + 1*ele.electronID("POG_Cuts_ID_2012_Loose_full5x5") + 1*ele.electronID("POG_Cuts_ID_2012_Medium_full5x5") + 1*ele.electronID("POG_Cuts_ID_2012_Tight_full5x5") + elif self.cfg_ana.ele_tightId=="Cuts_PHYS14_25ns_v1_ConvVetoDxyDz" : + ele.tightIdResult = -1 + 1*ele.electronID("POG_Cuts_ID_PHYS14_25ns_v1_ConvVetoDxyDz_Veto_full5x5") + 1*ele.electronID("POG_Cuts_ID_PHYS14_25ns_v1_ConvVetoDxyDz_Loose_full5x5") + 1*ele.electronID("POG_Cuts_ID_PHYS14_25ns_v1_ConvVetoDxyDz_Medium_full5x5") + 1*ele.electronID("POG_Cuts_ID_PHYS14_25ns_v1_ConvVetoDxyDz_Tight_full5x5") + else : try: ele.tightIdResult = ele.electronID(self.cfg_ana.ele_tightId) @@ -349,7 +352,10 @@ def attachMiniIsolation(self, mu): # -- version with increasing cone at low pT, gives slightly better performance for tight cuts and low pt leptons # mu.miniIsoR = 10.0/min(max(mu.pt(), 50),200) if mu.pt() > 20 else 4.0/min(max(mu.pt(),10),20) what = "mu" if (abs(mu.pdgId()) == 13) else ("eleB" if mu.isEB() else "eleE") - mu.miniAbsIsoCharged = self.IsolationComputer.chargedAbsIso(mu.physObj, mu.miniIsoR, {"mu":0.0001,"eleB":0,"eleE":0.015}[what], 0.0); + if what == "mu": + mu.miniAbsIsoCharged = self.IsolationComputer.chargedAbsIso(mu.physObj, mu.miniIsoR, {"mu":0.0001,"eleB":0,"eleE":0.015}[what], 0.0); + else: + mu.miniAbsIsoCharged = self.IsolationComputer.chargedAbsIso(mu.physObj, mu.miniIsoR, {"mu":0.0001,"eleB":0,"eleE":0.015}[what], 0.0,self.IsolationComputer.selfVetoNone); if self.miniIsolationPUCorr == "weights": if what == "mu": mu.miniAbsIsoNeutral = self.IsolationComputer.neutralAbsIsoWeighted(mu.physObj, mu.miniIsoR, 0.01, 0.5); @@ -373,7 +379,7 @@ def attachMiniIsolation(self, mu): if what == "mu": mu.miniAbsIsoPU = self.IsolationComputer.puAbsIso(mu.physObj, mu.miniIsoR, 0.01, 0.5); else: - mu.miniAbsIsoPU = self.IsolationComputer.puAbsIso(mu.physObj, mu.miniIsoR, 0.015 if what == "eleE" else 0.0, 0.0); + mu.miniAbsIsoPU = self.IsolationComputer.puAbsIso(mu.physObj, mu.miniIsoR, 0.015 if what == "eleE" else 0.0, 0.0,self.IsolationComputer.selfVetoNone); mu.miniAbsIsoNeutral = max(0.0, mu.miniAbsIsoNeutral - 0.5*mu.miniAbsIsoPU) elif self.miniIsolationPUCorr != 'raw': raise RuntimeError, "Unsupported miniIsolationCorr name '" + str(self.cfg_ana.miniIsolationCorr) + "'! For now only 'rhoArea', 'deltaBeta', 'raw', 'weights' are supported (and 'weights' is not tested)." @@ -473,6 +479,7 @@ def process(self, event): loose_muon_dxy = 0.05, loose_muon_dz = 0.2, loose_muon_relIso = 0.4, + # loose_muon_isoCut = lambda muon :muon.miniRelIso < 0.2 # inclusive very loose electron selection inclusive_electron_id = "", inclusive_electron_pt = 5, @@ -487,6 +494,7 @@ def process(self, event): loose_electron_dxy = 0.05, loose_electron_dz = 0.2, loose_electron_relIso = 0.4, + # loose_electron_isoCut = lambda electron : electron.miniRelIso < 0.1 loose_electron_lostHits = 1.0, # muon isolation correction method (can be "rhoArea" or "deltaBeta") mu_isoCorr = "rhoArea" , diff --git a/PhysicsTools/Heppy/python/analyzers/objects/TauAnalyzer.py b/PhysicsTools/Heppy/python/analyzers/objects/TauAnalyzer.py index 67d60a2c44bfb..c4e11a9270f02 100644 --- a/PhysicsTools/Heppy/python/analyzers/objects/TauAnalyzer.py +++ b/PhysicsTools/Heppy/python/analyzers/objects/TauAnalyzer.py @@ -20,7 +20,6 @@ def declareHandles(self): super(TauAnalyzer, self).declareHandles() self.handles['taus'] = AutoHandle( ('slimmedTaus',''),'std::vector') - def beginLoop(self, setup): super(TauAnalyzer,self).beginLoop(setup) self.counters.addCounter('events') @@ -28,46 +27,46 @@ def beginLoop(self, setup): count.register('all events') count.register('has >=1 tau at preselection') count.register('has >=1 selected taus') - count.register('has >=1 loose taus') - count.register('has >=1 inclusive taus') + count.register('has >=1 other taus') #------------------ # MAKE LEPTON LISTS #------------------ def makeTaus(self, event): - event.selectedTaus = [] - event.looseTaus = [] event.inclusiveTaus = [] + event.selectedTaus = [] + event.otherTaus = [] #get all alltaus = map( Tau, self.handles['taus'].product() ) - foundTau = False + #make inclusive taus for tau in alltaus: tau.associatedVertex = event.goodVertices[0] if len(event.goodVertices)>0 else event.vertices[0] tau.lepVeto = False tau.idDecayMode = tau.tauID("decayModeFinding") tau.idDecayModeNewDMs = tau.tauID("decayModeFindingNewDMs") - if hasattr(self.cfg_ana, 'decayModeID') and self.cfg_ana.decayModeID and not tau.tauID(self.cfg_ana.decayModeID): + + if hasattr(self.cfg_ana, 'inclusive_decayModeID') and self.cfg_ana.inclusive_decayModeID and not tau.tauID(self.cfg_ana.inclusive_decayModeID): continue - if self.cfg_ana.vetoLeptons: + tau.inclusive_lepVeto = False + if self.cfg_ana.inclusive_vetoLeptons: for lep in event.selectedLeptons: - if deltaR(lep.eta(), lep.phi(), tau.eta(), tau.phi()) < self.cfg_ana.leptonVetoDR: - tau.lepVeto = True - if tau.lepVeto: continue - if self.cfg_ana.vetoLeptonsPOG: - if not tau.tauID(self.cfg_ana.tauAntiMuonID): - tau.lepVeto = True - if not tau.tauID(self.cfg_ana.tauAntiElectronID): - tau.lepVeto = True - if tau.lepVeto: continue - - if tau.pt() < self.cfg_ana.ptMin: continue - if abs(tau.eta()) > self.cfg_ana.etaMax: continue - if abs(tau.dxy()) > self.cfg_ana.dxyMax or abs(tau.dz()) > self.cfg_ana.dzMax: continue - - foundTau = True + if deltaR(lep.eta(), lep.phi(), tau.eta(), tau.phi()) < self.cfg_ana.inclusive_leptonVetoDR: + tau.inclusive_lepVeto = True + if tau.inclusive_lepVeto: continue + if self.cfg_ana.inclusive_vetoLeptonsPOG: + if not tau.tauID(self.cfg_ana.inclusive_tauAntiMuonID): + tau.inclusive_lepVeto = True + if not tau.tauID(self.cfg_ana.inclusive_tauAntiElectronID): + tau.inclusive_lepVeto = True + if tau.inclusive_lepVeto: continue + + if tau.pt() < self.cfg_ana.inclusive_ptMin: continue + if abs(tau.eta()) > self.cfg_ana.inclusive_etaMax: continue + if abs(tau.dxy()) > self.cfg_ana.inclusive_dxyMax or abs(tau.dz()) > self.cfg_ana.inclusive_dzMax: continue + def id3(tau,X): """Create an integer equal to 1-2-3 for (loose,medium,tight)""" return tau.tauID(X%"Loose") + tau.tauID(X%"Medium") + tau.tauID(X%"Tight") @@ -86,28 +85,44 @@ def id6(tau,X): tau.idAntiMu = tau.tauID("againstMuonLoose") + tau.tauID("againstMuonTight") tau.idAntiE = id5(tau, "againstElectron%sMVA5") #print "Tau pt %5.1f: idMVA2 %d, idCI3hit %d, %s, %s" % (tau.pt(), tau.idMVA2, tau.idCI3hit, tau.tauID(self.cfg_ana.tauID), tau.tauID(self.cfg_ana.tauLooseID)) - if tau.tauID(self.cfg_ana.tauID): - event.selectedTaus.append(tau) - event.inclusiveTaus.append(tau) - elif tau.tauID(self.cfg_ana.tauLooseID): - event.looseTaus.append(tau) + + if tau.tauID(self.cfg_ana.inclusive_tauID): event.inclusiveTaus.append(tau) + + for tau in event.inclusiveTaus: + tau.loose_lepVeto = False + if self.cfg_ana.loose_vetoLeptons: + for lep in event.selectedLeptons: + if deltaR(lep.eta(), lep.phi(), tau.eta(), tau.phi()) < self.cfg_ana.loose_leptonVetoDR: + tau.loose_lepVeto = True + if self.cfg_ana.loose_vetoLeptonsPOG: + if not tau.tauID(self.cfg_ana.loose_tauAntiMuonID): + tau.loose_lepVeto = True + if not tau.tauID(self.cfg_ana.loose_tauAntiElectronID): + tau.loose_lepVeto = True + + if tau.tauID(self.cfg_ana.loose_decayModeID) and \ + tau.pt() > self.cfg_ana.loose_ptMin and abs(tau.eta()) < self.cfg_ana.loose_etaMax and \ + abs(tau.dxy()) < self.cfg_ana.loose_dxyMax and abs(tau.dz()) < self.cfg_ana.loose_dzMax and \ + tau.tauID(self.cfg_ana.loose_tauID) and not tau.loose_lepVeto: + event.selectedTaus.append(tau) + else: + event.otherTaus.append(tau) + + event.inclusiveTaus.sort(key = lambda l : l.pt(), reverse = True) event.selectedTaus.sort(key = lambda l : l.pt(), reverse = True) - event.looseTaus.sort(key = lambda l : l.pt(), reverse = True) + event.otherTaus.sort(key = lambda l : l.pt(), reverse = True) self.counters.counter('events').inc('all events') - if foundTau: self.counters.counter('events').inc('has >=1 tau at preselection') + if len(event.inclusiveTaus): self.counters.counter('events').inc('has >=1 tau at preselection') if len(event.selectedTaus): self.counters.counter('events').inc('has >=1 selected taus') - if len(event.looseTaus): self.counters.counter('events').inc('has >=1 loose taus') - if len(event.inclusiveTaus): self.counters.counter('events').inc('has >=1 inclusive taus') - + if len(event.otherTaus): self.counters.counter('events').inc('has >=1 other taus') def matchTaus(self, event): match = matchObjectCollection3(event.inclusiveTaus, event.gentaus, deltaRMax = 0.5) for lep in event.inclusiveTaus: gen = match[lep] lep.mcMatchId = 1 if gen else 0 - lep.mcTau = gen def process(self, event): self.readCollections( event.input ) @@ -126,18 +141,31 @@ def process(self, event): # http://cmslxr.fnal.gov/lxr/source/PhysicsTools/PatAlgos/python/producersLayer1/tauProducer_cfi.py setattr(TauAnalyzer,"defaultConfig",cfg.Analyzer( - class_object=TauAnalyzer, - ptMin = 20, - etaMax = 9999, - dxyMax = 1000., - dzMax = 0.2, - vetoLeptons = True, - leptonVetoDR = 0.4, - decayModeID = "decayModeFindingNewDMs", # ignored if not set or "" - tauID = "byLooseCombinedIsolationDeltaBetaCorr3Hits", - vetoLeptonsPOG = False, # If True, the following two IDs are required - tauAntiMuonID = "againstMuonLoose3", - tauAntiElectronID = "againstElectronLooseMVA5", - tauLooseID = "decayModeFinding", + class_object = TauAnalyzer, + # inclusive very loose hadronic tau selection + inclusive_ptMin = 18, + inclusive_etaMax = 9999, + inclusive_dxyMax = 1000., + inclusive_dzMax = 0.4, + inclusive_vetoLeptons = False, + inclusive_leptonVetoDR = 0.4, + inclusive_decayModeID = "decayModeFindingNewDMs", # ignored if not set or "" + inclusive_tauID = "decayModeFindingNewDMs", + inclusive_vetoLeptonsPOG = False, # If True, the following two IDs are required + inclusive_tauAntiMuonID = "", + inclusive_tauAntiElectronID = "", + # loose hadronic tau selection + loose_ptMin = 18, + loose_etaMax = 9999, + loose_dxyMax = 1000., + loose_dzMax = 0.2, + loose_vetoLeptons = True, + loose_leptonVetoDR = 0.4, + loose_decayModeID = "decayModeFindingNewDMs", # ignored if not set or "" + loose_tauID = "byLooseCombinedIsolationDeltaBetaCorr3Hits", + loose_vetoLeptonsPOG = False, # If True, the following two IDs are required + loose_tauAntiMuonID = "againstMuonLoose3", + loose_tauAntiElectronID = "againstElectronLooseMVA5", + loose_tauLooseID = "decayModeFindingNewDMs" ) ) diff --git a/PhysicsTools/Heppy/python/analyzers/objects/autophobj.py b/PhysicsTools/Heppy/python/analyzers/objects/autophobj.py index d9103da0662cd..aecbf4f770a90 100644 --- a/PhysicsTools/Heppy/python/analyzers/objects/autophobj.py +++ b/PhysicsTools/Heppy/python/analyzers/objects/autophobj.py @@ -29,6 +29,11 @@ NTupleVariable("pdgId", lambda x : x.pdgId(), int), ]) +weightsInfoType = NTupleObjectType("WeightsInfo", variables = [ + NTupleVariable("id", lambda x : x.id, int), + NTupleVariable("wgt", lambda x : x.wgt), +]) + ##------------------------------------------ ## LEPTON ##------------------------------------------ @@ -53,13 +58,15 @@ # Isolations with the two radia NTupleVariable("relIso03", lambda x : x.relIso03, help="PF Rel Iso, R=0.3, pile-up corrected"), NTupleVariable("relIso04", lambda x : x.relIso04, help="PF Rel Iso, R=0.4, pile-up corrected"), + NTupleVariable("miniRelIso", lambda x : x.miniRelIso if hasattr(x,'miniRelIso') else -999, help="PF Rel miniRel, pile-up corrected"), # Charge flip rejection criteria NTupleVariable("tightCharge", lambda lepton : ( lepton.isGsfCtfScPixChargeConsistent() + lepton.isGsfScPixChargeConsistent() ) if abs(lepton.pdgId()) == 11 else 2*(lepton.innerTrack().ptError()/lepton.innerTrack().pt() < 0.2), int, help="Tight charge criteria: for electrons, 2 if isGsfCtfScPixChargeConsistent, 1 if only isGsfScPixChargeConsistent, 0 otherwise; for muons, 2 if ptError/pt < 0.20, 0 otherwise "), # MC-match info - NTupleVariable("mcMatchId", lambda x : x.mcMatchId, int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), + NTupleVariable("mcMatchId", lambda x : getattr(x, 'mcMatchId', -99), int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), NTupleVariable("mcMatchAny", lambda x : x.mcMatchAny, int, mcOnly=True, help="Match to any final state leptons: 0 if unmatched, 1 if light flavour (including prompt), 4 if charm, 5 if bottom"), NTupleVariable("mcMatchTau", lambda x : x.mcMatchTau, int, mcOnly=True, help="True if the leptons comes from a tau"), NTupleVariable("mcPt", lambda x : x.mcLep.pt() if getattr(x,"mcLep",None) else 0., mcOnly=True, help="p_{T} of associated gen lepton"), + NTupleVariable("mediumMuonId", lambda x : x.muonID("POG_ID_Medium") if abs(x.pdgId())==13 else 1, int, help="Muon POG Medium id"), ]) ### EXTENDED VERSION WITH INDIVIUAL DISCRIMINATING VARIABLES @@ -70,7 +77,6 @@ # Extra muon ID working points NTupleVariable("softMuonId", lambda x : x.muonID("POG_ID_Soft") if abs(x.pdgId())==13 else 1, int, help="Muon POG Soft id"), NTupleVariable("pfMuonId", lambda x : x.muonID("POG_ID_Loose") if abs(x.pdgId())==13 else 1, int, help="Muon POG Loose id"), - NTupleVariable("mediumMuonId", lambda x : x.muonID("POG_ID_Medium") if abs(x.pdgId())==13 else 1, int, help="Muon POG Medium id"), # Extra electron ID working points NTupleVariable("eleCutId2012_full5x5", lambda x : (1*x.electronID("POG_Cuts_ID_2012_full5x5_Veto") + 1*x.electronID("POG_Cuts_ID_2012_full5x5_Loose") + 1*x.electronID("POG_Cuts_ID_2012_full5x5_Medium") + 1*x.electronID("POG_Cuts_ID_2012_full5x5_Tight")) if abs(x.pdgId()) == 11 else -1, int, help="Electron cut-based id (POG 2012, full5x5 shapes): 0=none, 1=veto, 2=loose, 3=medium, 4=tight"), # Extra tracker-related variables @@ -117,7 +123,7 @@ NTupleVariable("idAntiE", lambda x : x.idAntiE, int, help="1,2,3,4,5 if the tau passes the v loose, loose, medium, tight, v tight WP of the againstElectronMVA5 discriminator"), NTupleVariable("isoCI3hit", lambda x : x.tauID("byCombinedIsolationDeltaBetaCorrRaw3Hits"), help="byCombinedIsolationDeltaBetaCorrRaw3Hits raw output discriminator"), # MC-match info - NTupleVariable("mcMatchId", lambda x : x.mcMatchId, int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), + NTupleVariable("mcMatchId", lambda x : getattr(x, 'mcMatchId', -99), int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), ]) ##------------------------------------------ @@ -128,7 +134,7 @@ NTupleVariable("charge", lambda x : x.charge(), int), NTupleVariable("dz", lambda x : x.dz() , help="d_{z} of lead track with respect to PV, in cm (with sign)"), NTupleVariable("absIso", lambda x : x.absIso, float, mcOnly=False, help="abs charged iso with condition for isolation such that Min(0.2*pt, 8 GeV)"), - NTupleVariable("mcMatchId", lambda x : x.mcMatchId, int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), + NTupleVariable("mcMatchId", lambda x : getattr(x, 'mcMatchId', -99), int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), ]) @@ -148,7 +154,7 @@ NTupleVariable("chHadIso04", lambda x : x.chargedHadronIso(), float, help="chargedHadronIsolation for photons (PAT method, deltaR = 0.4)"), NTupleVariable("neuHadIso", lambda x : x.recoNeutralHadronIso(), float, help="neutralHadronIsolation for photons"), NTupleVariable("phIso", lambda x : x.recoPhotonIso(), float, help="gammaIsolation for photons"), - NTupleVariable("mcMatchId", lambda x : x.mcMatchId, int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), + NTupleVariable("mcMatchId", lambda x : getattr(x, 'mcMatchId', -99), int, mcOnly=True, help="Match to source from hard scatter (pdgId of heaviest particle in chain, 25 for H, 6 for t, 23/24 for W/Z), zero if non-prompt or fake"), NTupleVariable("mcPt", lambda x : x.mcGamma.pt() if getattr(x,"mcGamma",None) else 0., mcOnly=True, help="p_{T} of associated gen photon"), ]) @@ -159,8 +165,8 @@ jetType = NTupleObjectType("jet", baseObjectTypes = [ fourVectorType ], variables = [ NTupleVariable("id", lambda x : x.jetID("POG_PFID") , int, mcOnly=False,help="POG Loose jet ID"), NTupleVariable("puId", lambda x : getattr(x, 'puJetIdPassed', -99), int, mcOnly=False, help="puId (full MVA, loose WP, 5.3.X training on AK5PFchs: the only thing that is available now)"), - NTupleVariable("btagCSV", lambda x : x.btag('combinedInclusiveSecondaryVertexV2BJetTags'), help="CSV-IVF v2 discriminator"), - NTupleVariable("btagCMVA", lambda x : x.btag('combinedMVABJetTags'), help="CMVA discriminator"), + NTupleVariable("btagCSV", lambda x : x.btag('pfCombinedInclusiveSecondaryVertexV2BJetTags'), help="CSV-IVF v2 discriminator"), + NTupleVariable("btagCMVA", lambda x : x.btag('pfCombinedMVABJetTags'), help="CMVA discriminator"), NTupleVariable("rawPt", lambda x : x.pt() * x.rawFactor(), help="p_{T} before JEC"), NTupleVariable("mcPt", lambda x : x.mcJet.pt() if getattr(x,"mcJet",None) else 0., mcOnly=True, help="p_{T} of associated gen jet"), NTupleVariable("mcFlavour", lambda x : x.partonFlavour(), int, mcOnly=True, help="parton flavour (physics definition, i.e. including b's from shower)"), diff --git a/PhysicsTools/Heppy/python/physicsobjects/Electron.py b/PhysicsTools/Heppy/python/physicsobjects/Electron.py index f2ba87b2e76b9..7fe31b30c96a3 100644 --- a/PhysicsTools/Heppy/python/physicsobjects/Electron.py +++ b/PhysicsTools/Heppy/python/physicsobjects/Electron.py @@ -28,6 +28,7 @@ def electronID( self, id, vertex=None, rho=None ): elif id == "POG_MVA_ID_Trig": return self.mvaIDTight() elif id == "POG_MVA_ID_NonTrig_full5x5": return self.mvaIDLoose(full5x5=True) elif id == "POG_MVA_ID_Trig_full5x5": return self.mvaIDTight(full5x5=True) + elif id == "POG_MVA_ID_Run2_NonTrig_VLoose": return self.mvaIDRun2("NonTrigPhys14","VLoose") elif id == "POG_MVA_ID_Run2_NonTrig_Loose": return self.mvaIDRun2("NonTrigPhys14","Loose") elif id == "POG_MVA_ID_Run2_NonTrig_Tight": return self.mvaIDRun2("NonTrigPhys14","Tight") elif id.startswith("POG_Cuts_ID_"): @@ -42,7 +43,7 @@ def cutBasedId(self, wp, showerShapes="auto"): showerShapes = "full5x5" wp = wp.replace("_full5x5","") elif showerShapes == "auto": - if "POG_CSA14_25ns_v1" in wp or "POG_CSA14_50ns_v1" in wp or "POG_PHYS14_25ns_v1" in wp: + if "POG_CSA14_25ns_v1" in wp or "POG_CSA14_50ns_v1" in wp or "POG_PHYS14_25ns_v1" in wp or "POG_PHYS14_25ns_v1_ConvVeto" in wp or "POG_PHYS14_25ns_v1_ConvVetoDxyDz" in wp: showerShapes = "full5x5" vars = { 'dEtaIn' : abs(self.physObj.deltaEtaSuperClusterTrackAtVtx()), @@ -53,6 +54,8 @@ def cutBasedId(self, wp, showerShapes="auto"): '1/E-1/p' : abs(1.0/self.physObj.ecalEnergy() - self.physObj.eSuperClusterOverP()/self.physObj.ecalEnergy()) if self.physObj.ecalEnergy()>0. else 9e9, 'conversionVeto' : self.physObj.passConversionVeto(), 'missingHits' : self.physObj.gsfTrack().hitPattern().numberOfHits(ROOT.reco.HitPattern.MISSING_INNER_HITS), # http://cmslxr.fnal.gov/source/DataFormats/TrackReco/interface/HitPattern.h?v=CMSSW_7_2_3#0153 + 'dxy' : abs(self.dxy()), + 'dz' : abs(self.dz()), } WP = { ## ------- https://twiki.cern.ch/twiki/bin/viewauth/CMS/EgammaCutBasedIdentification?rev=31 @@ -97,11 +100,24 @@ def cutBasedId(self, wp, showerShapes="auto"): WP.update(WP_conversion_veto) + WP_conversion_veto_DxyDz = { + # missing Hits incremented by 1 because we return False if >=, note the '=' + ## ------- https://twiki.cern.ch/twiki/bin/viewauth/CMS/CutBasedElectronIdentificationRun2#Working_points_for_PHYS14_sample + 'POG_PHYS14_25ns_v1_ConvVetoDxyDz_Veto' : WP['POG_PHYS14_25ns_v1_ConvVeto_Veto' ]+[('dxy',[0.060279, 0.273097]), ('dz',[0.800538, 0.885860])], + 'POG_PHYS14_25ns_v1_ConvVetoDxyDz_Loose' : WP['POG_PHYS14_25ns_v1_ConvVeto_Loose' ]+[('dxy',[0.022664, 0.097358]), ('dz',[0.173670, 0.198444])], + 'POG_PHYS14_25ns_v1_ConvVetoDxyDz_Medium' : WP['POG_PHYS14_25ns_v1_ConvVeto_Medium']+[('dxy',[0.011811, 0.051682]), ('dz',[0.070775, 0.180720])], + 'POG_PHYS14_25ns_v1_ConvVetoDxyDz_Tight' : WP['POG_PHYS14_25ns_v1_ConvVeto_Tight' ]+[('dxy',[0.009924, 0.027261]), ('dz',[0.015310, 0.147154])], + } + + WP.update(WP_conversion_veto_DxyDz) + + if wp not in WP: raise RuntimeError, "Working point '%s' not yet implemented in Electron.py" % wp for (cut_name,(cut_eb,cut_ee)) in WP[wp]: if cut_name == 'conversionVeto': - return vars[cut_name] == (cut_eb if self.physObj.isEB() else cut_ee) + if (cut_eb if self.physObj.isEB() else cut_ee) and not vars[cut_name]: + return False elif vars[cut_name] >= (cut_eb if self.physObj.isEB() else cut_ee): return False return True @@ -177,6 +193,10 @@ def mvaIDRun2(self, name, wp): if (eta < 0.8) : return self.mvaRun2(name) > +0.35; elif (eta < 1.479): return self.mvaRun2(name) > +0.20; else : return self.mvaRun2(name) > -0.52; + elif wp=="VLoose": + if (eta < 0.8) : return self.mvaRun2(name) > -0.11; + elif (eta < 1.479): return self.mvaRun2(name) > -0.35; + else : return self.mvaRun2(name) > -0.55; elif wp=="Tight": if (eta < 0.8) : return self.mvaRun2(name) > 0.73; elif (eta < 1.479): return self.mvaRun2(name) > 0.57; diff --git a/PhysicsTools/Heppy/python/physicsobjects/Jet.py b/PhysicsTools/Heppy/python/physicsobjects/Jet.py index 241f1339723f8..4fec0b6643b95 100644 --- a/PhysicsTools/Heppy/python/physicsobjects/Jet.py +++ b/PhysicsTools/Heppy/python/physicsobjects/Jet.py @@ -16,21 +16,21 @@ ] _btagWPs = { - "TCHEL": ("trackCountingHighEffBJetTags", 1.7), - "TCHEM": ("trackCountingHighEffBJetTags", 3.3), - "TCHPT": ("trackCountingHighPurBJetTags", 3.41), - "JPL": ("jetProbabilityBJetTags", 0.275), - "JPM": ("jetProbabilityBJetTags", 0.545), - "JPT": ("jetProbabilityBJetTags", 0.790), + "TCHEL": ("pfTrackCountingHighEffBJetTags", 1.7), + "TCHEM": ("pfTrackCountingHighEffBJetTags", 3.3), + "TCHPT": ("pfTrackCountingHighPurBJetTags", 3.41), + "JPL": ("pfJetProbabilityBJetTags", 0.275), + "JPM": ("pfJetProbabilityBJetTags", 0.545), + "JPT": ("pfJetProbabilityBJetTags", 0.790), "CSVL": ("combinedSecondaryVertexBJetTags", 0.244), "CSVM": ("combinedSecondaryVertexBJetTags", 0.679), "CSVT": ("combinedSecondaryVertexBJetTags", 0.898), - "CSVv2IVFL": ("combinedInclusiveSecondaryVertexV2BJetTags", 0.423), - "CSVv2IVFM": ("combinedInclusiveSecondaryVertexV2BJetTags", 0.814), - "CSVv2IVFT": ("combinedInclusiveSecondaryVertexV2BJetTags", 0.941), - "CMVAL": ("combinedMVABJetTags", 0.630), # for same b-jet efficiency of CSVv2IVFL on ttbar MC, jet pt > 30 - "CMVAM": ("combinedMVABJetTags", 0.732), # for same b-jet efficiency of CSVv2IVFM on ttbar MC, jet pt > 30 - "CMVAT": ("combinedMVABJetTags", 0.813), # for same b-jet efficiency of CSVv2IVFT on ttbar MC, jet pt > 30 + "CSVv2IVFL": ("pfCombinedInclusiveSecondaryVertexV2BJetTags", 0.423), + "CSVv2IVFM": ("pfCombinedInclusiveSecondaryVertexV2BJetTags", 0.814), + "CSVv2IVFT": ("pfCombinedInclusiveSecondaryVertexV2BJetTags", 0.941), + "CMVAL": ("pfCombinedMVABJetTags", 0.630), # for same b-jet efficiency of CSVv2IVFL on ttbar MC, jet pt > 30 + "CMVAM": ("pfCombinedMVABJetTags", 0.732), # for same b-jet efficiency of CSVv2IVFM on ttbar MC, jet pt > 30 + "CMVAT": ("pfCombinedMVABJetTags", 0.813), # for same b-jet efficiency of CSVv2IVFT on ttbar MC, jet pt > 30 } @@ -97,12 +97,15 @@ def setRawFactor(self, factor): self._rawFactorMultiplier = factor/self.jecFactor('Uncorrected') def btag(self,name): - return self.bDiscriminator(name) + ret = self.bDiscriminator(name) + if ret == -1000 and name.startswith("pf"): + ret = self.bDiscriminator(name[2].lower()+name[3:]) + return ret def btagWP(self,name): global _btagWPs (disc,val) = _btagWPs[name] - return self.bDiscriminator(disc) > val + return self.btag(disc) > val def leadingTrack(self): if self._leadingTrackSearched : diff --git a/PhysicsTools/Heppy/python/utils/cmsswPreprocessor.py b/PhysicsTools/Heppy/python/utils/cmsswPreprocessor.py index c5165c93051bc..7028999665f69 100644 --- a/PhysicsTools/Heppy/python/utils/cmsswPreprocessor.py +++ b/PhysicsTools/Heppy/python/utils/cmsswPreprocessor.py @@ -42,5 +42,8 @@ def run(self,component,wd,firstEvent,nEvents): f.close() runstring="%s %s >& %s/cmsRun.log" % (self.command,configfile,wd) print "Running pre-processor: %s " %runstring - os.system(runstring) + ret=os.system(runstring) + if ret != 0: + print "CMSRUN failed" + exit(ret) return component diff --git a/PhysicsTools/Heppy/scripts/cmsBatch.py b/PhysicsTools/Heppy/scripts/cmsBatch.py new file mode 100755 index 0000000000000..108bc8d391f3d --- /dev/null +++ b/PhysicsTools/Heppy/scripts/cmsBatch.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python +# Colin +# batch mode for cmsRun, March 2009 + +import os, sys, imp, re, pprint, string, time,shutil,copy,pickle,math +from optparse import OptionParser + +# particle flow specific +from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager +import PhysicsTools.HeppyCore.utils.eostools as castortools + +# cms specific +import FWCore.ParameterSet.Config as cms +from IOMC.RandomEngine.RandomServiceHelper import RandomNumberServiceHelper + + + + +def batchScriptCCIN2P3(): + script = """!/usr/bin/env bash +#PBS -l platform=LINUX,u_sps_cmsf,M=2000MB,T=2000000 +# sets the queue +#PBS -q T +#PBS -eo +#PBS -me +#PBS -V + +source $HOME/.bash_profile + +echo '***********************' + +ulimit -v 3000000 + +# coming back to submission dir do setup the env +cd $PBS_O_WORKDIR +eval `scramv1 ru -sh` + + +# back to the worker +cd - + +# copy job dir here +cp -r $PBS_O_WORKDIR . + +# go inside +jobdir=`ls` +echo $jobdir + +cd $jobdir + +cat > sysinfo.sh < sysinfo.txt + +cmsRun run_cfg.py + +# copy job dir do disk +cd - +cp -r $jobdir $PBS_O_WORKDIR +""" + return script + + +def batchScriptCERN( remoteDir, index ): + '''prepare the LSF version of the batch script, to run on LSF''' + script = """#!/bin/bash +# sets the queue +#BSUB -q 8nm + +echo 'environment:' +echo +env +ulimit -v 3000000 +echo 'copying job dir to worker' +cd $CMSSW_BASE/src +eval `scramv1 ru -sh` +cd - +cp -rf $LS_SUBCWD . +ls +cd `find . -type d | grep /` +echo 'running' +%s run_cfg.py +if [ $? != 0 ]; then + echo wrong exit code! removing all root files + rm *.root + exit 1 +fi +echo 'sending the job directory back' +""" % prog + + if remoteDir != '': + remoteDir = castortools.eosToLFN(remoteDir) #remoteDir.replace('/eos/cms','') + script += """ +for file in *.root; do +newFileName=`echo $file | sed -r -e 's/\./_%s\./'` +fullFileName=%s/$newFileName +#this does cmsStage, but with retries +cmsStageWithFailover.py -f $file $fullFileName +#write the files as user readable but not writable +eos chmod 755 /eos/cms/$fullFileName +done +""" % (index, remoteDir) + script += 'rm *.root\n' + script += 'cp -rf * $LS_SUBCWD\n' + + return script + + +def batchScriptLocal( remoteDir, index ): + '''prepare a local version of the batch script, to run using nohup''' + + script = """#!/bin/bash +echo 'running' +%s run_cfg.py +if [ $? != 0 ]; then + echo wrong exit code! removing all root files + rm *.root + exit 1 +fi +echo 'sending the job directory back' +""" % prog + + if remoteDir != '': + remoteDir = castortools.eosToLFN(remoteDir) + script += """ +for file in *.root; do +newFileName=`echo $file | sed -r -e 's/\./_%s\./'` +cmsStageWithFailover.py -f $file $fullFileName +eos chmod 755 /eos/cms/$fullFileName +done +""" % (index, remoteDir) + script += 'rm *.root\n' + return script + + +class CmsBatchException( Exception): + '''Exception class for this script''' + + def __init__(self, value): + self.value = value + + def __str__(self): + return str( self.value) + + +class MyBatchManager( BatchManager ): + '''Batch manager specific to cmsRun processes.''' + + def PrepareJobUser(self, jobDir, value ): + '''Prepare one job. This function is called by the base class.''' + + process.source = fullSource.clone() + + #prepare the batch script + scriptFileName = jobDir+'/batchScript.sh' + scriptFile = open(scriptFileName,'w') + storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','') + mode = self.RunningMode(options.batch) + if mode == 'LXPLUS': + scriptFile.write( batchScriptCERN( storeDir, value) ) #here is the call to batchScriptCERN, i need to change value + elif mode == 'LOCAL': + scriptFile.write( batchScriptLocal( storeDir, value) ) #same as above but for batchScriptLocal + scriptFile.close() + os.system('chmod +x %s' % scriptFileName) + + #prepare the cfg + # replace the list of fileNames by a chunk of filenames: + if generator: + randSvc = RandomNumberServiceHelper(process.RandomNumberGeneratorService) + randSvc.populate() + else: + iFileMin = (value-1)*grouping + iFileMax = (value)*grouping + process.source.fileNames = fullSource.fileNames[iFileMin:iFileMax] + print process.source + cfgFile = open(jobDir+'/run_cfg.py','w') + cfgFile.write('import FWCore.ParameterSet.Config as cms\n\n') + cfgFile.write('import os,sys\n') + # need to import most of the config from the base directory containing all jobs + cfgFile.write("sys.path.append('%s')\n" % os.path.dirname(jobDir) ) + cfgFile.write('from base_cfg import *\n') + cfgFile.write('process.source = ' + process.source.dumpPython() + '\n') + if generator: + cfgFile.write('process.RandomNumberGeneratorService = ' + process.RandomNumberGeneratorService.dumpPython() + '\n') + cfgFile.close() + + +batchManager = MyBatchManager() + + +file = open('cmsBatch.txt', 'w') +file.write(string.join(sys.argv) + "\n") +file.close() + +batchManager.parser_.usage = """ +%prog [options] . + +Submits a number of jobs taking your_cfg.py as a template. your_cfg.py can either read events from input files, or produce them with a generator. In the later case, the seeds are of course updated for each job. + +A local output directory is created locally. This directory contains a job directory for each job, and a Logger/ directory containing information on the software you are using. +By default: +- the name of the output directory is created automatically. +- the output root files end up in the job directories. + +Each job directory contains: +- the full python configuration for this job. You can run it interactively by doing: +cmsRun run_cfg.py +- the batch script to run the job. You can submit it again by calling the batch command yourself, see the -b option. +- while running interactively: nohup.out, where the job stderr and stdout are redirected. To check the status of a job running interactively, do: +tail nohup.out +- after running: + o the full nohup.out (your log) and your root files, in case you ran interactively + o the LSF directory, in case you ran on LSF + +Also see fwBatch.py, which is a layer on top of cmsBatch.py adapted to the organization of our samples on the CMST3. + +Examples: + +First do: +cd $CMSSW_BASE/src/CMGTools/Common/test + +to run on your local machine: +cmsBatch.py 1 testCMGTools_cfg.py -b 'nohup ./batchScript.sh&' + +to run on LSF (you must be logged on lxplus, not on your interactive machine, so that you have access to LSF) +cmsBatch.py 1 testCMGTools_cfg.py -b 'bsub -q 8nm < ./batchScript.sh' +""" +batchManager.parser_.add_option("-p", "--program", dest="prog", + help="program to run on your cfg file", + default="cmsRun") +## batchManager.parser_.add_option("-b", "--batch", dest="batch", +## help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.", +## default="bsub -q 8nh < .batchScript.sh") +batchManager.parser_.add_option("-c", "--command-args", dest="cmdargs", + help="command line arguments for the job", + default=None) +batchManager.parser_.add_option("--notagCVS", dest="tagPackages", + default=True,action="store_false", + help="tag the package on CVS (True)") + +(options,args) = batchManager.parser_.parse_args() +batchManager.ParseOptions() + +prog = options.prog +doCVSTag = options.tagPackages + +if len(args)!=2: + batchManager.parser_.print_help() + sys.exit(1) + +# testing that we run a sensible batch command. If not, exit. +runningMode = None +try: + runningMode = batchManager.RunningMode( options.batch ) +except CmsBatchException as err: + print err + sys.exit(1) + +grouping = int(args[0]) +nJobs = grouping +cfgFileName = args[1] + +print 'Loading cfg' + +pycfg_params = options.cmdargs +trueArgv = sys.argv +sys.argv = [cfgFileName] +if pycfg_params: + sys.argv.extend(pycfg_params.split(' ')) +print sys.argv + + +# load cfg script +handle = open(cfgFileName, 'r') +cfo = imp.load_source("pycfg", cfgFileName, handle) +process = cfo.process +handle.close() + +# Restore original sys.argv +sys.argv = trueArgv + + +# keep track of the original source +fullSource = process.source.clone() +generator = False + +try: + process.source.fileNames +except: + print 'No input file. This is a generator process.' + generator = True + listOfValues = [i+1 for i in range( nJobs )] #Here is where the list of values is created +else: + print "Number of files in the source:",len(process.source.fileNames), ":" + pprint.pprint(process.source.fileNames) + nFiles = len(process.source.fileNames) + nJobs = nFiles / grouping + if (nJobs!=0 and (nFiles % grouping) > 0) or nJobs==0: + nJobs = nJobs + 1 + + print "number of jobs to be created: ", nJobs + listOfValues = [i+1 for i in range( nJobs )] #OR Here is where the list of values is created + #here i change from e.g 0-19 to 1-20 + +batchManager.PrepareJobs( listOfValues ) #PrepareJobs with listOfValues as param + +# preparing master cfg file + +cfgFile = open(batchManager.outputDir_+'/base_cfg.py','w') +cfgFile.write( process.dumpPython() + '\n') +cfgFile.close() + +# need to wait 5 seconds to give castor some time +# now on EOS, should be ok. reducing to 1 sec +waitingTime = 1 +if runningMode == 'LOCAL': + # of course, not the case when running with nohup + # because we will never have enough processes to saturate castor. + waitingTime = 0 +batchManager.SubmitJobs( waitingTime ) + + +# logging + +from PhysicsTools.HeppyCore.utils.logger import logger + +oldPwd = os.getcwd() +os.chdir(batchManager.outputDir_) +logDir = 'Logger' +os.system( 'mkdir ' + logDir ) +log = logger( logDir ) + +log.logCMSSW() +log.logJobs(nJobs) +#COLIN not so elegant... but tar is behaving in a strange way. +log.addFile( oldPwd + '/' + cfgFileName ) + +if not batchManager.options_.negate: + if batchManager.remoteOutputDir_ != "": + # we don't want to crush an existing log file on castor + #COLIN could protect the logger against that. + log.stageOut( batchManager.remoteOutputDir_ ) + +os.chdir( oldPwd ) + + diff --git a/PhysicsTools/Heppy/scripts/cmsStageWithFailover.py b/PhysicsTools/Heppy/scripts/cmsStageWithFailover.py new file mode 100755 index 0000000000000..1d606655eaf53 --- /dev/null +++ b/PhysicsTools/Heppy/scripts/cmsStageWithFailover.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +#this script runs cmsStage multiple times in the case where it failes for some reason + +if __name__ == '__main__': + + import CMGTools.Production.eostools as eostools + eostools.setCAFPath() + + from cmsIO import * + from cmsStage import * + + import sys, time + + #this taken from the main of cmsStage + argv = sys.argv[1:] + (args, debug, force ) = parseOpts( argv ) + + if not os.path.isfile(args[0]): + print args[0], 'does not exist.' + sys.exit(1) + source = cmsFile( args[0], "rfio" ) + destination = cmsFile( args[1], "stageout" ) + checkArgs( source, destination, force ) + + #find the destination LFN + dest = args[1] + if eostools.isDirectory(dest): + dest = os.path.join(dest,os.path.basename(args[0])) + + sleep_lengths = [1,10,60,600,1800] + return_code = 0 + for i in xrange(5): + + #sleep for a while before running + time.sleep(sleep_lengths[i]) + + try: + #run cmsStage + print 'cmsStage %s [%d/5]' % (' '.join(argv) , i+1) + main(argv) + + except SystemExit, e: + print "cmsStage exited with code '%s'. Retrying... [%d/5]" % ( str(e), i+1 ) + return_code = e.code + + #sleep again before checking + time.sleep(3) + + if eostools.fileExists(dest) and eostools.isFile(dest): + if source.size() == destination.size(): + return_code = 0 + break + + sys.exit(return_code) diff --git a/PhysicsTools/HeppyCore/python/framework/chain.py b/PhysicsTools/HeppyCore/python/framework/chain.py index c1c8fd407345d..43117bbcba2c2 100644 --- a/PhysicsTools/HeppyCore/python/framework/chain.py +++ b/PhysicsTools/HeppyCore/python/framework/chain.py @@ -6,6 +6,19 @@ import pprint from ROOT import TChain, TFile, TTree, gSystem +def is_pfn(fn): + return not (is_lfn(fn) or is_rootfn(fn)) + +def is_lfn(fn): + return fn.startswith("/store") + +def is_rootfn(fn): + """ + To open files like root://, file:// which os.isfile won't find. + """ + return "://" in fn + + class Chain( object ): """Wrapper to TChain, with a python iterable interface. @@ -37,7 +50,10 @@ def __init__(self, input, tree_name=None): if len(self.files)==0: raise ValueError('no matching file name: '+input) else: # case of a list of files - if False in [ os.path.isfile(fnam) for fnam in self.files ]: + if False in [ + ((is_pfn(fnam) and os.path.isfile(fnam)) or + is_lfn(fnam)) or is_rootfn(fnam) + for fnam in self.files]: err = 'at least one input file does not exist\n' err += pprint.pformat(self.files) raise ValueError(err) @@ -93,13 +109,3 @@ def __getitem__(self, index): return self.chain -if __name__ == '__main__': - - import sys - - if len(sys.argv)!=3: - print 'usage: Chain.py ' - sys.exit(1) - tree_name = sys.argv[1] - pattern = sys.argv[2] - chain = Chain( tree_name, pattern ) diff --git a/PhysicsTools/HeppyCore/python/framework/chain_test.py b/PhysicsTools/HeppyCore/python/framework/chain_test.py index 9ceda3f40124b..ccc308f229fea 100644 --- a/PhysicsTools/HeppyCore/python/framework/chain_test.py +++ b/PhysicsTools/HeppyCore/python/framework/chain_test.py @@ -2,7 +2,7 @@ import os import shutil -from PhysicsTools.HeppyCore.framework.chain import Chain +from chain import Chain from PhysicsTools.HeppyCore.utils.testtree import create_tree testfname = 'test_tree.root' @@ -35,6 +35,11 @@ def test_load_2(self): chain = Chain(testfname.replace('.root', '*.root'), 'test_tree') self.assertEqual(len(chain), 200) os.remove(tmpfile) + + def test_load_3(self): + '''Test LFN/root-fn loading''' + chain = Chain(["root://{0}".format(os.path.abspath(testfname))], 'test_tree') + self.assertEqual(len(chain), 100) def test_iterate(self): '''Test iteration''' diff --git a/PhysicsTools/HeppyCore/python/framework/config.py b/PhysicsTools/HeppyCore/python/framework/config.py index d06d87f5f2eba..5e44114c0bd9d 100644 --- a/PhysicsTools/HeppyCore/python/framework/config.py +++ b/PhysicsTools/HeppyCore/python/framework/config.py @@ -75,11 +75,16 @@ def __init__(self, class_object, instance_label='1', self.class_object = class_object self.instance_label = instance_label - self.name = self.build_name() self.verbose = verbose - # self.cfg = CFG(**kwargs) super(Analyzer, self).__init__(**kwargs) + def __setattr__(self, name, value): + '''You may decide to copy an existing analyzer and change + its instance_label. In that case, one must stay consistent.''' + self.__dict__[name] = value + if name == 'instance_label': + self.name = self.build_name() + def build_name(self): class_name = '.'.join([self.class_object.__module__, self.class_object.__name__]) @@ -140,7 +145,7 @@ def __init__(self, name, files, tree_name=None, triggers=None, **kwargs): class DataComponent( Component ): def __init__(self, name, files, intLumi=None, triggers=[], json=None): - super(DataComponent, self).__init__(name, files, triggers) + super(DataComponent, self).__init__(name, files, triggers=triggers) self.isData = True self.intLumi = intLumi self.json = json diff --git a/PhysicsTools/HeppyCore/python/framework/config_test.py b/PhysicsTools/HeppyCore/python/framework/config_test.py index 91e38c908d693..6bbb3e4c0914b 100644 --- a/PhysicsTools/HeppyCore/python/framework/config_test.py +++ b/PhysicsTools/HeppyCore/python/framework/config_test.py @@ -1,8 +1,9 @@ import unittest import os import shutil +import copy -from PhysicsTools.HeppyCore.framework.config import * +from config import * class ConfigTestCase(unittest.TestCase): @@ -48,8 +49,19 @@ class Ana1(object): services = [], events_class = Events ) - - + def test_copy(self): + class Ana1(object): + pass + ana1 = Analyzer( + Ana1, + instance_label = 'inst1', + toto = '1', + ) + ana2 = copy.copy(ana1) + ana2.instance_label = 'inst2' + ana2.toto2 = '2' + self.assertEqual(ana2.name, '__main__.Ana1_inst2') + self.assertEqual(ana2.toto2, '2') if __name__ == '__main__': unittest.main() diff --git a/PhysicsTools/HeppyCore/python/framework/event.py b/PhysicsTools/HeppyCore/python/framework/event.py index 2fca9389f2d03..16dfb22da06eb 100644 --- a/PhysicsTools/HeppyCore/python/framework/event.py +++ b/PhysicsTools/HeppyCore/python/framework/event.py @@ -1,3 +1,4 @@ +import collections from ROOT import TChain class Event(object): @@ -16,7 +17,7 @@ class Event(object): #TODO: provide a clear interface for access control (put, get, del products) - we should keep track of the name and id of the analyzer. ''' - def __init__(self, iEv, input_data, setup, eventWeight=1 ): + def __init__(self, iEv, input_data=None, setup=None, eventWeight=1 ): self.iEv = iEv self.input = input_data self.setup = setup @@ -30,8 +31,9 @@ def __str__(self): tmp = value # check for recursivity recursive = False - if hasattr(value, '__getitem__'): - if (len(value)>0 and value[0].__class__ == value.__class__): + if hasattr(value, '__getitem__') and \ + not isinstance(value, collections.Mapping) and \ + (len(value)>0 and value[0].__class__ == value.__class__): recursive = True if hasattr(value, '__contains__') and \ not isinstance(value, (str,unicode)) and \ diff --git a/PhysicsTools/HeppyCore/python/framework/eventsgen.py b/PhysicsTools/HeppyCore/python/framework/eventsgen.py new file mode 100644 index 0000000000000..3bdd6713109f5 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/framework/eventsgen.py @@ -0,0 +1,12 @@ +import sys + +class Events(object): + + def __init__(self, dummy, dummy_2=None): + pass + + def __len__(self): + return sys.maxint + + def __getitem__(self, index): + return self diff --git a/PhysicsTools/HeppyCore/python/framework/eventstfile.py b/PhysicsTools/HeppyCore/python/framework/eventstfile.py index e577da31e6f02..54fbe9fcfac56 100644 --- a/PhysicsTools/HeppyCore/python/framework/eventstfile.py +++ b/PhysicsTools/HeppyCore/python/framework/eventstfile.py @@ -27,19 +27,3 @@ def to(self, iEv): def __iter__(self): return iter(self.tree) - -if __name__ == '__main__': - - import sys - events = Events(sys.argv[1], 'test_tree') - print '\naccessing one event directly' - event = events.to(2) - print event.var1 - print '\nlooping on all events:' - for iev, ev in enumerate(events): - if iev==9: - print '...' - if iev>=9 and iev<990: - continue - else: - print ev.var1 diff --git a/PhysicsTools/HeppyCore/python/framework/eventstfile_test.py b/PhysicsTools/HeppyCore/python/framework/eventstfile_test.py new file mode 100644 index 0000000000000..450fd11657f25 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/framework/eventstfile_test.py @@ -0,0 +1,18 @@ +import unittest + +from eventstfile import Events +from PhysicsTools.HeppyCore.utils.testtree import create_tree + +testfname = 'test_tree.root' + +class EventsTFileTestCase(unittest.TestCase): + + def test(self): + events = Events(testfname, 'test_tree') + event = events.to(2) + for iev, ev in enumerate(events): + self.assertEqual(iev, ev.var1) + +if __name__ == '__main__': + create_tree(testfname) + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/framework/heppy.py b/PhysicsTools/HeppyCore/python/framework/heppy_loop.py similarity index 71% rename from PhysicsTools/HeppyCore/python/framework/heppy.py rename to PhysicsTools/HeppyCore/python/framework/heppy_loop.py index 7edd93fc44f1c..9c122726d65e3 100755 --- a/PhysicsTools/HeppyCore/python/framework/heppy.py +++ b/PhysicsTools/HeppyCore/python/framework/heppy_loop.py @@ -153,7 +153,7 @@ def main( options, args ): # Propagate global options to _heppyGlobalOptions within this module # I have to import it explicitly, 'global' does not work since the # module is not set when executing the main - from PhysicsTools.HeppyCore.framework.heppy import _heppyGlobalOptions + from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions for opt in options.extraOptions: if "=" in opt: (key,val) = opt.split("=",1) @@ -177,7 +177,7 @@ def main( options, args ): shutil.copy( cfgFileName, outDir ) pool = Pool(processes=min(len(selComps),10)) ## workaround for a scoping problem in ipython+multiprocessing - import PhysicsTools.HeppyCore.framework.heppy as ML + import PhysicsTools.HeppyCore.framework.heppy_loop as ML for comp in selComps: print 'submitting', comp.name pool.apply_async( ML.runLoopAsync, [comp, outDir, 'PhysicsTools.HeppyCore.__cfg_to_run__', options], @@ -190,65 +190,3 @@ def main( options, args ): global loop loop = runLoop( comp, outDir, cfg.config, options ) - - -if __name__ == '__main__': - from optparse import OptionParser - - parser = OptionParser() - parser.usage = """ - %prog - For each component, start a Loop. - 'name' is whatever you want. - """ - - parser.add_option("-N", "--nevents", - dest="nevents", - type="int", - help="number of events to process", - default=None) - parser.add_option("-p", "--nprint", - dest="nprint", - help="number of events to print at the beginning", - default=5) - parser.add_option("-e", "--iEvent", - dest="iEvent", - help="jump to a given event. ignored in multiprocessing.", - default=None) - parser.add_option("-f", "--force", - dest="force", - action='store_true', - help="don't ask questions in case output directory already exists.", - default=False) - parser.add_option("-i", "--interactive", - dest="interactive", - action='store_true', - help="stay in the command line prompt instead of exiting", - default=False) - parser.add_option("-t", "--timereport", - dest="timeReport", - action='store_true', - help="Make a report of the time used by each analyzer", - default=False) - parser.add_option("-v", "--verbose", - dest="verbose", - action='store_true', - help="increase the verbosity of the output (from 'warning' to 'info' level)", - default=False) - parser.add_option("-q", "--quiet", - dest="quiet", - action='store_true', - help="do not print log messages to screen.", - default=False) - parser.add_option("-o", "--option", - dest="extraOptions", - type="string", - action="append", - default=[], - help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file") - - (options,args) = parser.parse_args() - - main(options, args) - if not options.interactive: - exit() # trigger exit also from ipython diff --git a/PhysicsTools/HeppyCore/python/framework/looper.py b/PhysicsTools/HeppyCore/python/framework/looper.py index 9e5022f6de213..a1fa68348ed6a 100644 --- a/PhysicsTools/HeppyCore/python/framework/looper.py +++ b/PhysicsTools/HeppyCore/python/framework/looper.py @@ -183,15 +183,15 @@ def loop(self): print 'Stopped loop following a UserWarning exception' info = self.logger.info - info('number of events processed: {nEv}'.format(nEv=iEv+1)) - info('') + warning = self.logger.warning + warning('number of events processed: {nEv}'.format(nEv=iEv+1)) + warning('') info( self.cfg_comp ) info('') for analyzer in self.analyzers: analyzer.endLoop(self.setup) if self.timeReport: allev = max([x['events'] for x in self.timeReport]) - warning = self.logger.warning warning("\n ---- TimeReport (all times in ms; first evt is skipped) ---- ") warning("%9s %9s %9s %9s %6s %s" % ("processed","all evts","time/proc", " time/all", " [%] ", "analyer")) warning("%9s %9s %9s %9s %6s %s" % ("---------","--------","---------", "---------", " -----", "-------------")) @@ -266,3 +266,4 @@ def write(self): looper = Looper( 'Loop', cfg.config,nPrint = 5) looper.loop() looper.write() + diff --git a/PhysicsTools/HeppyCore/python/framework/services/service.py b/PhysicsTools/HeppyCore/python/framework/services/service.py index 243003e339a68..9a2f2bafda3c8 100644 --- a/PhysicsTools/HeppyCore/python/framework/services/service.py +++ b/PhysicsTools/HeppyCore/python/framework/services/service.py @@ -1,5 +1,19 @@ class Service(object): - '''Basic service interface.''' + '''Basic service interface. + If you want your own service, you should respect this interface + so that your service can be used by the looper. + ''' + + def __init__(self, cfg, comp, outdir): + ''' + cfg: framework.config.Service object containing whatever parameters + you need + comp: dummy parameter + outdir: output directory for your service (feel free not to use it) + + Please have a look at TFileService for more information + ''' + def start(self): '''Start the service. Called by the looper, not by the user. diff --git a/PhysicsTools/HeppyCore/python/framework/services/service_test.py b/PhysicsTools/HeppyCore/python/framework/services/service_test.py new file mode 100644 index 0000000000000..74a33df919898 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/framework/services/service_test.py @@ -0,0 +1,26 @@ +import unittest +import os +import shutil + +from tfile import TFileService +import PhysicsTools.HeppyCore.framework.config as cfg + +class ServiceTestCase(unittest.TestCase): + + def test_tfile(self): + config = cfg.Service(TFileService, + 'myhists', + fname = 'histos.root', + option = 'recreate') + dummy = None + dirname = 'test_dir' + if os.path.exists(dirname): + shutil.rmtree(dirname) + os.mkdir(dirname) + fileservice = TFileService(config, dummy, dirname) + fileservice.start() + fileservice.stop() + shutil.rmtree(dirname) + +if __name__ == '__main__': + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/framework/services/tfile.py b/PhysicsTools/HeppyCore/python/framework/services/tfile.py index eb6deea7d74b3..29c3bf30dccd2 100644 --- a/PhysicsTools/HeppyCore/python/framework/services/tfile.py +++ b/PhysicsTools/HeppyCore/python/framework/services/tfile.py @@ -2,7 +2,7 @@ from ROOT import TFile class TFileService(Service): - '''TFile service. + """TFile service. The file attribute is a TFile that can be used in several analyzers. The file is closed when the service stops. @@ -14,9 +14,24 @@ class TFileService(Service): fname='histograms.root', option='recreate' ) - - ''' + """ def __init__(self, cfg, comp, outdir): + """ + cfg must contain: + - fname: file name + - option: TFile options, e.g. recreate + + outdir is the output directory for the TFile + + comp is a dummy parameter here. + It is needed because the looper creates services and analyzers + in the same way, providing the configuration (cfg), + the component currently processed (comp), + and the output directory. + + Other implementations of the TFileService could + make use of the component information, eg. the component name. + """ fname = '/'.join([outdir, cfg.fname]) self.file = TFile(fname, cfg.option) @@ -24,7 +39,3 @@ def stop(self): self.file.Write() self.file.Close() -if __name__ == '__main__': - fileservice = TFileService('test.root', 'recreate') - fileservice.start() - fileservice.stop() diff --git a/PhysicsTools/HeppyCore/python/statistics/average_test.py b/PhysicsTools/HeppyCore/python/statistics/average_test.py index ea3d9928130a7..ac3f6782dc002 100644 --- a/PhysicsTools/HeppyCore/python/statistics/average_test.py +++ b/PhysicsTools/HeppyCore/python/statistics/average_test.py @@ -1,6 +1,6 @@ import unittest -from PhysicsTools.HeppyCore.statistics.average import Average +from average import Average class AverageTestCase(unittest.TestCase): diff --git a/PhysicsTools/HeppyCore/python/statistics/counter.py b/PhysicsTools/HeppyCore/python/statistics/counter.py index e437a4f86caf2..b461f5b3c0b44 100644 --- a/PhysicsTools/HeppyCore/python/statistics/counter.py +++ b/PhysicsTools/HeppyCore/python/statistics/counter.py @@ -1,4 +1,4 @@ -# Copyright (C) 2014 Colin Bernet + # Copyright (C) 2014 Colin Bernet # https://github.com/cbernet/heppy/blob/master/LICENSE import pickle @@ -14,19 +14,16 @@ def register(self, level): self.add( level, [level, 0] ) def inc(self, level, nentries=1): - '''Call this function to create a level for this counter, - or to increment an existing level. + '''increment an existing level ''' if level not in self.dico: raise ValueError('level', level, 'has not been registered') - # self.add( level, [level, nentries]) else: self[level][1] += nentries def __add__(self, other): '''Add two counters (+).''' size = max( len(self), len(other)) - # import pdb; pdb.set_trace() for i in range(0, size): if i>=len(other): # this line exists only in this counter, leave it as is @@ -35,7 +32,6 @@ def __add__(self, other): self.register( other[i][0]) self.inc( other[i][0], other[i][1] ) else: - # exists in both if self[i][0] != other[i][0]: err = ['cannot add these counters:', str(self), str(other)] raise ValueError('\n'.join(err)) @@ -111,44 +107,3 @@ def __str__(self): def __getitem__(self, name): return self.counter(name) -if __name__ == '__main__': - - c = Counter('Test') - c.register('a') - c.register('b') - c.inc('a') - print c - - cs = Counters() - cs.addCounter('test') - cs.counter('test').register('a') - cs.counter('test').register('b') - cs.addCounter('test2') - cs.counter('test2').register('a') - cs.counter('test').inc('a') - cs.counter('test').inc('a') - cs.counter('test').inc('b') - cs.counter('test2').inc('a') - - print cs - - cs.write('.') - print 'loading ...' - file = open('test.pck') - lcs = pickle.load(file) - print lcs - - c1 = cs.counter('test2') - - print 'test addition, adding test and test2' - import copy - c2 = cs.counter('test') - c1 += c2 - print c1 - - print 'test addition : incompatible' - c = Counter('Test3') - c.register('b') - c.inc('b') - c1 += c - print c1 diff --git a/PhysicsTools/HeppyCore/python/statistics/counter_test.py b/PhysicsTools/HeppyCore/python/statistics/counter_test.py new file mode 100644 index 0000000000000..56683b82f67a2 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/statistics/counter_test.py @@ -0,0 +1,60 @@ +import unittest +import os +import shutil + +from counter import Counter + +class CounterTestCase(unittest.TestCase): + + def test_simple(self): + c = Counter('Test') + c.register('a') + c.register('b') + c.inc('a') + self.assertEqual(c['a'], ['a', 1]) + self.assertEqual(c['b'], ['b', 0]) + c.inc('a') + self.assertEqual(c['a'], ['a', 2]) + + def test_add(self): + c = Counter('Test') + c.register('a') + c.register('b') + c.inc('a') + d = Counter('Test') + d.register('a') + d.register('b') + d.inc('a') + d.inc('b') + d += c + self.assertEqual(d['a'], ['a', 2]) + self.assertEqual(d['b'], ['b', 1]) + + def test_bad_add(self): + c = Counter('Test') + c.register('a') + c.register('b') + c.inc('a') + d = Counter('Test') + d.register('b') + self.assertRaises(ValueError, d.__iadd__, c) + + def test_write(self): + c = Counter('Test') + c.register('a') + c.register('b') + c.inc('a') + dirname = 'test_dir' + if os.path.exists(dirname): + shutil.rmtree(dirname) + os.mkdir(dirname) + c.write(dirname) + shutil.rmtree(dirname) + + + + + + +if __name__ == '__main__': + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/statistics/tree.py b/PhysicsTools/HeppyCore/python/statistics/tree.py index 6f3e084e09266..8e72c7da7a9c8 100644 --- a/PhysicsTools/HeppyCore/python/statistics/tree.py +++ b/PhysicsTools/HeppyCore/python/statistics/tree.py @@ -130,20 +130,3 @@ def vfill(self, varName, values ): fillit = self.fillers[varName] for (i,v) in enumerate(values): fillit(a[i],v) - -if __name__=='__main__': - - from ROOT import TFile - - f = TFile('TreeNumpy.root','RECREATE') - t = TreeNumpy('Colin', 'Another test tree') - t.var('a') - t.var('b') - - t.fill('a', 3) - t.fill('a', 4) - t.fill('b', 5) - t.tree.Fill() - - f.Write() - f.Close() diff --git a/PhysicsTools/HeppyCore/python/statistics/tree_test.py b/PhysicsTools/HeppyCore/python/statistics/tree_test.py index 2e5aae5b7194f..aecf333262dea 100644 --- a/PhysicsTools/HeppyCore/python/statistics/tree_test.py +++ b/PhysicsTools/HeppyCore/python/statistics/tree_test.py @@ -1,6 +1,6 @@ import unittest from ROOT import TFile -from PhysicsTools.HeppyCore.statistics.tree import Tree +from tree import Tree class TreeTestCase(unittest.TestCase): diff --git a/PhysicsTools/HeppyCore/python/statistics/value.py b/PhysicsTools/HeppyCore/python/statistics/value.py index 47d461483d526..d42a55595da11 100644 --- a/PhysicsTools/HeppyCore/python/statistics/value.py +++ b/PhysicsTools/HeppyCore/python/statistics/value.py @@ -12,6 +12,9 @@ def __init__(self, val, err): def relerr(self): return abs(self.err / self.val) + def __eq__(self, other): + return self.val == other.val and self.err == other.err + def __iadd__(self, other): self.val += other.val self.err = math.sqrt( self.err*self.err + other.err*other.err) @@ -47,15 +50,3 @@ def __str__(self): return '{val:10.3f} +- {err:8.3f} ({relerr:5.2f}%)'.format(val=self.val, err=self.err, relerr=self.relerr()*100) - -if __name__=='__main__': - - val1 = Value(1.,0.02) - val2 = Value(2.,0.02) - val3 = val1 / val2 - print val1 - print val2 - print - print val1+val2 - print val1-val2 - print val1/val2 diff --git a/PhysicsTools/HeppyCore/python/statistics/value_test.py b/PhysicsTools/HeppyCore/python/statistics/value_test.py new file mode 100644 index 0000000000000..0c1ef782a31c1 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/statistics/value_test.py @@ -0,0 +1,20 @@ +import unittest +import math +from value import Value + +class ValueTestCase(unittest.TestCase): + + def test(self): + val1 = Value(1.,0.02) + val2 = Value(2.,0.02) + val3 = val1 / val2 + # should test the value and the error after each operation. + # I'll write the tests when I have some time + + def test_equal(self): + val1 = Value(1.,0.02) + val2 = Value(1.,0.02) + self.assertEqual(val1, val2) + +if __name__ == '__main__': + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/utils/batchmanager.py b/PhysicsTools/HeppyCore/python/utils/batchmanager.py index c154b6c00923b..082e5a0ea1c01 100644 --- a/PhysicsTools/HeppyCore/python/utils/batchmanager.py +++ b/PhysicsTools/HeppyCore/python/utils/batchmanager.py @@ -9,6 +9,7 @@ import pprint import time +import eostools as castortools class BatchManager: """ @@ -19,8 +20,8 @@ class BatchManager: # constructor # self is this - # parse batch manager options - def __init__(self): + # parse batch manager options + def __init__(self): self.DefineOptions() @@ -29,14 +30,14 @@ def DefineOptions(self): # how to add more doc to the help? self.parser_ = OptionParser() self.parser_.add_option("-o", "--output-dir", dest="outputDir", - help="Name of the local output directory for your jobs. This directory will be created automatically.", - default=None) + help="Name of the local output directory for your jobs. This directory will be created automatically.", + default=None) self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy", - help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory is tarred and compressed into Logger.tgz, and sent to the remote output directory as well. Afterwards, use logger.py to access the information contained in Logger.tgz. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Logs will be sent back to the submision directory. NOTE: so far this option has been implemented and validated to work only for a remote copy to PSI", - default=None) + help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy", + default=None) self.parser_.add_option("-f", "--force", action="store_true", dest="force", default=False, - help="Don't ask any questions, just over-write") + help="Don't ask any questions, just over-write") # this opt can be removed self.parser_.add_option("-n", "--negate", action="store_true", dest="negate", default=False, @@ -44,20 +45,14 @@ def DefineOptions(self): self.parser_.add_option("-b", "--batch", dest="batch", help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.", default="bsub -q 8nh < ./batchScript.sh") - self.parser_.add_option("-p", "--parametric", action="store_true", - dest="parametric", default=False, - help="submit jobs parametrically, implemented for IC so far") - - def ParseOptions(self): + def ParseOptions(self): (self.options_,self.args_) = self.parser_.parse_args() if self.options_.remoteCopy == None: self.remoteOutputDir_ = "" - else: + else: # removing possible trailing slash - import CMGTools.Production.eostools as castortools self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/') - if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI: # overwriting protection to be improved if self.remoteOutputDir_.startswith("/pnfs/psi.ch"): @@ -65,7 +60,7 @@ def ParseOptions(self): if ld_lib_path != "None": os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path # to solve gfal conflict with CMSSW os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_) - outputDir = self.options_.outputDir.rstrip("/").split("/")[-1] # to for instance direct output to /afs/cern.ch/work/u/user/outputDir + outputDir = self.options_.outputDir if outputDir==None: today = datetime.today() outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M") @@ -81,9 +76,9 @@ def ParseOptions(self): if not castortools.isLFN( self.remoteOutputDir_ ): print 'When providing an output directory, you must give its LFN, starting by /store. You gave:' print self.remoteOutputDir_ - sys.exit(1) + sys.exit(1) self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ ) - dirExist = castortools.isDirectory( self.remoteOutputDir_ ) + dirExist = castortools.isDirectory( self.remoteOutputDir_ ) # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_ # dirExist = os.system( nsls ) if dirExist is False: @@ -98,20 +93,21 @@ def ParseOptions(self): if self.options_.negate is False and self.options_.force is False: #COLIN need to reimplement protectedRemove in eostools raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.'])) - # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'): - # the user does not want to delete the root files + # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'): + # the user does not want to delete the root files + self.remoteOutputFile_ = "" self.ManageOutputDir() return (self.options_, self.args_) - + def PrepareJobs(self, listOfValues, listOfDirNames=None): print 'PREPARING JOBS ======== ' self.listOfJobs_ = [] if listOfDirNames is None: - for value in listOfValues: - self.PrepareJob( value ) + for value in listOfValues: + self.PrepareJob( value ) else: for value, name in zip( listOfValues, listOfDirNames): self.PrepareJob( value, name ) @@ -122,10 +118,10 @@ def PrepareJobs(self, listOfValues, listOfDirNames=None): # create output dir, if necessary def ManageOutputDir( self ): - + #if the output dir is not specified, generate a name - #else - #test if the directory exists + #else + #test if the directory exists #if yes, returns outputDir = self.options_.outputDir @@ -133,107 +129,77 @@ def ManageOutputDir( self ): if outputDir==None: today = datetime.today() outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S") - print 'output directory not specified, using %s' % outputDir - + print 'output directory not specified, using %s' % outputDir + self.outputDir_ = os.path.abspath(outputDir) if( os.path.isdir(self.outputDir_) == True ): input = '' if not self.options_.force: while input != 'y' and input != 'n': - input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n]' ) + input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' ) if input == 'n': sys.exit(1) else: os.system( 'rm -rf ' + self.outputDir_) - + self.mkdir( self.outputDir_ ) - + def PrepareJob( self, value, dirname=None): '''Prepare a job for a given value. calls PrepareJobUser, which should be overloaded by the user. ''' - print 'PrepareJob : %s' % value + print 'PrepareJob : %s' % value dname = dirname if dname is None: dname = 'Job_{value}'.format( value=value ) jobDir = '/'.join( [self.outputDir_, dname]) - print '\t',jobDir + print '\t',jobDir self.mkdir( jobDir ) self.listOfJobs_.append( jobDir ) self.PrepareJobUser( jobDir, value ) - + def PrepareJobUser(self, value ): '''Hook allowing user to define how one of his jobs should be prepared.''' print '\to be customized' - + def SubmitJobs( self, waitingTimeInSec=0 ): '''Submit all jobs. Possibly wait between each job''' - + if(self.options_.negate): print '*NOT* SUBMITTING JOBS - exit ' return print 'SUBMITTING JOBS ======== ' - - mode = self.RunningMode(self.options_.batch) - - # If at IC write all the job directories to a file then submit a parameteric - # job that depends on the file number. This is required to circumvent the 2000 - # individual job limit at IC - if mode=="IC" and self.options_.parametric: - - jobDirsFile = os.path.join(self.outputDir_,"jobDirectories.txt") - with open(jobDirsFile, 'w') as f: - for jobDir in self.listOfJobs_: - print>>f,jobDir - - readLine = "readarray JOBDIR < "+jobDirsFile+"\n" - - submitScript = os.path.join(self.outputDir_,"parametricSubmit.sh") - with open(submitScript,'w') as batchScript: - batchScript.write("#!/bin/bash\n") - batchScript.write("#$ -e /dev/null -o /dev/null \n") - batchScript.write("cd "+self.outputDir_+"\n") - batchScript.write(readLine) - batchScript.write("cd ${JOBDIR[${SGE_TASK_ID}-1]}\n") - batchScript.write( "./batchScript.sh > BATCH_outputLog.txt 2> BATCH_errorLog.txt" ) - - #Find the queue - splitBatchOptions = self.options_.batch.split() - if '-q' in splitBatchOptions: queue = splitBatchOptions[splitBatchOptions.index('-q')+1] - else: queue = "hepshort.q" - - os.system("qsub -q "+queue+" -t 1-"+str(len(self.listOfJobs_))+" "+submitScript) - - else: - #continue as before, submitting one job per directory - - for jobDir in self.listOfJobs_: - root = os.getcwd() - # run it - print 'processing ', jobDir - os.chdir( jobDir ) - self.SubmitJob( jobDir ) - # and come back - os.chdir(root) - print 'waiting %s seconds...' % waitingTimeInSec - time.sleep( waitingTimeInSec ) - print 'done.' + for jobDir in self.listOfJobs_: + root = os.getcwd() + # run it + print 'processing ', jobDir + os.chdir( jobDir ) + self.SubmitJob( jobDir ) + # and come back + os.chdir(root) + print 'waiting %s seconds...' % waitingTimeInSec + time.sleep( waitingTimeInSec ) + print 'done.' def SubmitJob( self, jobDir ): '''Hook for job submission.''' - print 'submitting (to be customized): ', jobDir + print 'submitting (to be customized): ', jobDir os.system( self.options_.batch ) + def SubmitJobArray( self, numbOfJobs = 1 ): + '''Hook for array job submission.''' + print 'Submitting array with %s jobs' % numbOfJobs + def CheckBatchScript( self, batchScript ): if batchScript == '': return - + if( os.path.isfile(batchScript)== False ): print 'file ',batchScript,' does not exist' sys.exit(3) @@ -266,56 +232,54 @@ def mkdir( self, dirname ): if( ret != 0 ): print 'please remove or rename directory: ', dirname sys.exit(4) - + def RunningMode(self, batch): - '''Returns "LXPLUS", "PSI", "LOCAL", or None, - + + '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None, + "LXPLUS" : batch command is bsub, and logged on lxplus "PSI" : batch command is qsub, and logged to t3uiXX - "IC" : batch command is qsub, and logged to hep.ph.ic.ac.uk + "NAF" : batch command is qsub, and logged on naf + "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk "LOCAL" : batch command is nohup. + In all other cases, a CmsBatchException is raised ''' - + hostName = os.environ['HOSTNAME'] + onLxplus = hostName.startswith('lxplus') - onPSI = hostName.startswith('t3ui' ) - onPISA = re.match('.*gridui.*',hostName) or re.match('.*faiwn.*',hostName) - onPADOVA = ( hostName.startswith('t2-ui') and re.match('.*pd.infn.*',hostName) ) or ( hostName.startswith('t2-cld') and re.match('.*lnl.infn.*',hostName) ) - onIC = 'hep.ph.ic.ac.uk' in hostName + onPSI = hostName.startswith('t3ui') + onNAF = hostName.startswith('naf') + batchCmd = batch.split()[0] - + if batchCmd == 'bsub': - if not (onLxplus or onPISA or onPADOVA) : + if not onLxplus: err = 'Cannot run %s on %s' % (batchCmd, hostName) raise ValueError( err ) - elif onPISA : - print 'running on LSF pisa : %s from %s' % (batchCmd, hostName) - return 'PISA' - elif onPADOVA: - print 'running on LSF padova: %s from %s' % (batchCmd, hostName) - return 'PADOVA' else: - print 'running on LSF lxplus: %s from %s' % (batchCmd, hostName) + print 'running on LSF : %s from %s' % (batchCmd, hostName) return 'LXPLUS' - elif batchCmd == "qsub": - #if not onPSI: - # err = 'Cannot run %s on %s' % (batchCmd, hostName) - # raise ValueError( err ) - if onIC: + elif batchCmd == "qsub": + if onPSI: + print 'running on SGE : %s from %s' % (batchCmd, hostName) + return 'PSI' + elif onNAF: + print 'running on NAF : %s from %s' % (batchCmd, hostName) + return 'NAF' + elif onIC: print 'running on IC : %s from %s' % (batchCmd, hostName) return 'IC' - else: - if onPSI: - print 'running on SGE : %s from %s' % (batchCmd, hostName) - return 'PSI' + err = 'Cannot run %s on %s' % (batchCmd, hostName) + raise ValueError( err ) elif batchCmd == 'nohup' or batchCmd == './batchScript.sh': print 'running locally : %s on %s' % (batchCmd, hostName) return 'LOCAL' else: err = 'unknown batch command: X%sX' % batchCmd - raise ValueError( err ) + raise ValueError( err ) diff --git a/PhysicsTools/HeppyCore/python/utils/castorBaseDir.py b/PhysicsTools/HeppyCore/python/utils/castorBaseDir.py new file mode 100755 index 0000000000000..79ba7e2126f0c --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/castorBaseDir.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import os, sys +import eostools as castortools + +def getUserAndArea(user): + """Factor out the magic user hack for use in other classes""" + + area = 'user' + + tokens = user.split('_') + if tokens and len(tokens) > 1: + user = tokens[0] + area = tokens[1] + return user, area + +def castorBaseDir( user=os.environ['USER'], area = None): + """Gets the top level directory to use for writing for 'user'""" + + if area is None: + user, area = getUserAndArea(user) + + d = 'root://eoscms.cern.ch//eos/cms/store/cmst3/%s/%s/CMG' % (area,user) + exists = castortools.isDirectory( castortools.lfnToCastor(d) ) + if exists: + return d + else: + msg = "The directory '%s' does not exist. Please check the username and area (user/group). You may need to create the directory yourself." % d + print >> sys.stderr, msg + raise NameError(msg) + +def myCastorBaseDir(): + """Gets the top level directory to use for writing for the current user""" + return castorBaseDir() diff --git a/PhysicsTools/HeppyCore/python/utils/dataset.py b/PhysicsTools/HeppyCore/python/utils/dataset.py new file mode 100644 index 0000000000000..ae2d2f018eb7a --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/dataset.py @@ -0,0 +1,507 @@ +#!/usr/bin/env python + +import os +import pprint +import re +import pickle +import sys + +from castorBaseDir import castorBaseDir +import eostools as castortools +import fnmatch + +class IntegrityCheckError(Exception): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class BaseDataset( object ): + + ### def __init__(self, name, user, pattern='.*root', run_range=None): + def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None): + self.name = name + self.user = user + self.pattern = pattern + self.run_range = run_range + ### MM + self.dbsInstance = dbsInstance + ### MM + self.primaryDatasetEntries = -1 + self.report = None + self.buildListOfFiles( self.pattern ) + self.extractFileSizes() + self.buildListOfBadFiles() + self.primaryDatasetEntries = self.getPrimaryDatasetEntries() + + def buildListOfFiles( self, pattern ): + self.files = [] + + def extractFileSizes(self): + '''Get the file size for each file, + from the eos ls -l command.''' + self.filesAndSizes = {} + + def buildListOfBadFiles(self): + self.good_files = [] + self.bad_files = {} + + def printInfo(self): + print 'sample : ' + self.name + print 'user : ' + self.user + + def getPrimaryDatasetEntries(self): + return self.primaryDatasetEntries + + def printFiles(self, abspath=True, info=True): + # import pdb; pdb.set_trace() + if self.files == None: + self.buildListOfFiles(self.pattern) + for file in self.files: + status = 'OK' + if self.bad_files.has_key(file): + status = self.bad_files[file] + elif file not in self.good_files: + status = 'UNKNOWN' + fileNameToPrint = file + if abspath == False: + fileNameToPrint = os.path.basename(file) + if info: + size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10) + # if size is not None: + # size = size.rjust(10) + print status.ljust(10), size, \ + '\t', fileNameToPrint + else: + print fileNameToPrint + print 'PrimaryDatasetEntries: %d' % self.primaryDatasetEntries + + def listOfFiles(self): + '''Returns all files, even the bad ones.''' + return self.files + + def listOfGoodFiles(self): + '''Returns all files flagged as good in the integrity + check text output, or not present in this file, are + considered as good.''' + self.good_files = [] + for file in self.files: + if not self.bad_files.has_key(file): + self.good_files.append( file ) + return self.good_files + + def listOfGoodFilesWithPrescale(self, prescale): + """Takes the list of good files and selects a random sample + from them according to the prescale factor. + E.g. a prescale of 10 will select 1 in 10 files.""" + + good_files = self.listOfGoodFiles() + if prescale < 2: + return self.good_files + + #the number of files to select from the dataset + num_files = int( (len(good_files)/(1.0*prescale)) + 0.5) + if num_files < 1: + num_files = 1 + if num_files > len(good_files): + num_files = len(good_files) + + #pick unique good files randomly + import random + subset = set() + while len(subset) < num_files: + #pick a random file from the list + choice = random.choice(good_files) + slen = len(subset) + #add to the set + subset.add(choice) + #if this was a unique file remove so we don't get + #very slow corner cases where prescale is small + if len(subset) > slen: + good_files.remove(choice) + assert len(subset)==num_files,'The number of files does not match' + + return [f for f in subset] + +class CMSDataset( BaseDataset ): + + def __init__(self, name, run_range = None): + super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range) + + def buildListOfFilesDBS(self, pattern, begin=-1, end=-1): + print 'buildListOfFilesDBS',begin,end + sampleName = self.name.rstrip('/') + query, qwhat = sampleName, "dataset" + if "#" in sampleName: qwhat = "block" + if self.run_range is not None and self.run_range != (-1,-1): + if self.run_range[0] == self.run_range[1]: + query += " run=%s" % self.run_range[0] + else: + print "WARNING: queries with run ranges are slow in DAS" + query += " run between [%s,%s]" % ( self.run_range[0],self.run_range[1] ) + dbs='das_client.py --query="file %s=%s"'%(qwhat,query) + if begin >= 0: + dbs += ' --index %d' % begin + if end >= 0: + dbs += ' --limit %d' % (end-begin+1) + else: + dbs += ' --limit 0' + print 'dbs\t: %s' % dbs + dbsOut = os.popen(dbs) + files = [] + for line in dbsOut: + if line.find('/store')==-1: + continue + line = line.rstrip() + # print 'line',line + files.append(line) + return files + + def buildListOfFiles(self, pattern='.*root'): + runs = (-1,-1) + if self.run_range is not None: + runs = self.run_range + num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'), + runs[0],runs[1]) + limit = 10000 + if num_files > limit: + num_steps = int(num_files/limit)+1 + self.files = [] + for i in xrange(num_steps): + DBSFiles=self.buildListOfFilesDBS(pattern, + i*limit, + ((i+1)*limit)-1) + self.files.extend(DBSFiles) + else: + self.files = self.buildListOfFilesDBS(pattern) + + @staticmethod + def findPrimaryDatasetEntries(dataset, runmin, runmax): + + query, qwhat = dataset, "dataset" + if "#" in dataset: qwhat = "block" + if runmin >0 or runmax > 0: + if runmin == runmax: + query = "%s run=%d" % (query,runmin) + else: + print "WARNING: queries with run ranges are slow in DAS" + query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999) + dbs='das_client.py --query="summary %s=%s"'%(qwhat,query) + dbsOut = os.popen(dbs).readlines() + + entries = [] + for line in dbsOut: + line = line.replace('\n','') + if "nevents" in line: + entries.append(int(line.split(":")[1])) + if entries: + return sum(entries) + return -1 + + @staticmethod + def findPrimaryDatasetNumFiles(dataset, runmin, runmax): + + query, qwhat = dataset, "dataset" + if "#" in dataset: qwhat = "block" + if runmin >0 or runmax > 0: + if runmin == runmax: + query = "%s run=%d" % (query,runmin) + else: + print "WARNING: queries with run ranges are slow in DAS" + query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999) + dbs='das_client.py --query="summary %s=%s"'%(qwhat,query) + dbsOut = os.popen(dbs).readlines() + + entries = [] + for line in dbsOut: + line = line.replace('\n','') + if "nfiles" in line: + entries.append(int(line.split(":")[1])) + if entries: + return sum(entries) + return -1 + + def getPrimaryDatasetEntries(self): + runmin = -1 + runmax = -1 + if self.run_range is not None: + runmin = self.run_range[0] + runmax = self.run_range[1] + return self.findPrimaryDatasetEntries(self.name, runmin, runmax) + +class LocalDataset( BaseDataset ): + + def __init__(self, name, basedir, pattern): + self.basedir = basedir + super(LocalDataset, self).__init__( name, 'LOCAL', pattern) + + def buildListOfFiles(self, pattern='.*root'): + pat = re.compile( pattern ) + sampleName = self.name.rstrip('/') + self.dir = ''.join( [os.path.abspath(self.basedir), + sampleName ] ) + self.files = [] + for file in sorted(os.listdir( self.dir )): + if pat.match( file ) is not None: + self.files.append( '/'.join([self.dir, file]) ) + # print file + +class EOSDataset(BaseDataset): + '''A dataset located in any given eos directory''' + + def __init__(self, directory, pattern): + self.castorDir = directory + if not castortools.isEOSDir(self.castorDir): + raise ValueError('directory should be a directory on EOS.') + name = directory + super(EOSDataset, self).__init__( name, 'EOS', pattern) + + def buildListOfFiles(self, pattern='.*root'): + self.files = castortools.matchingFiles( self.castorDir, pattern ) + + +class Dataset( BaseDataset ): + + def __init__(self, name, user, pattern='.*root'): + self.lfnDir = castorBaseDir(user) + name + self.castorDir = castortools.lfnToCastor( self.lfnDir ) + self.maskExists = False + self.report = None + super(Dataset, self).__init__(name, user, pattern) + + def buildListOfFiles(self, pattern='.*root'): + '''fills list of files, taking all root files matching the pattern in the castor dir''' + self.files = castortools.matchingFiles( self.castorDir, pattern ) + + def buildListOfBadFiles(self): + '''fills the list of bad files from the IntegrityCheck log. + + When the integrity check file is not available, + files are considered as good.''' + mask = "IntegrityCheck" + + self.bad_files = {} + self.good_files = [] + + file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\.txt$' % mask) + if file_mask: + from CMGTools.Production.edmIntegrityCheck import PublishToFileSystem + p = PublishToFileSystem(mask) + report = p.get(self.castorDir) + if report is not None and report: + self.maskExists = True + self.report = report + dup = report.get('ValidDuplicates',{}) + for name, status in report['Files'].iteritems(): + # print name, status + if not status[0]: + self.bad_files[name] = 'MarkedBad' + elif dup.has_key(name): + self.bad_files[name] = 'ValidDup' + else: + self.good_files.append( name ) + else: + raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" ) + + def extractFileSizes(self): + '''Get the file size for each file, from the eos ls -l command.''' + # EOS command does not work in tier3 + lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0] + lsout = lsout.split('\n') + self.filesAndSizes = {} + for entry in lsout: + values = entry.split() + if( len(values) != 5): + continue + # using full abs path as a key. + file = '/'.join([self.lfnDir, values[4].split("/")[-1]]) + size = values[1] + self.filesAndSizes[file] = size + + def printInfo(self): + print 'sample : ' + self.name + print 'LFN : ' + self.lfnDir + print 'Castor path : ' + self.castorDir + + def getPrimaryDatasetEntries(self): + if self.report is not None and self.report: + return int(self.report.get('PrimaryDatasetEntries',-1)) + return -1 + + +### MM +class PrivateDataset ( BaseDataset ): + + def __init__(self, name, dbsInstance=None): + super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance) + + def buildListOfFilesDBS(self, name, dbsInstance): + entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1) + files = [] + dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries) + dbsOut = os.popen(dbs) + for line in dbsOut: + if line.find('/store')==-1: + continue + line = line.rstrip() + # print 'line',line + files.append(line) + #return ['root://eoscms//eos/cms%s' % f for f in files] + return files + + def buildListOfFiles(self, pattern='.*root'): + self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance) + + + @staticmethod + def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax): + + query, qwhat = dataset, "dataset" + if "#" in dataset: qwhat = "block" + if runmin >0 or runmax > 0: + if runmin == runmax: + query = "%s run=%d" % (query,runmin) + else: + print "WARNING: queries with run ranges are slow in DAS" + query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999) + dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance) + dbsOut = os.popen(dbs).readlines() + + entries = [] + for line in dbsOut: + line = line.replace('\n','') + if "nevents" in line: + entries.append(int(line.split(":")[1])) + if entries: + return sum(entries) + return -1 + + + @staticmethod + def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax): + + query, qwhat = dataset, "dataset" + if "#" in dataset: qwhat = "block" + if runmin >0 or runmax > 0: + if runmin == runmax: + query = "%s run=%d" % (query,runmin) + else: + print "WARNING: queries with run ranges are slow in DAS" + query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999) + dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance) + dbsOut = os.popen(dbs).readlines() + + entries = [] + for line in dbsOut: + line = line.replace('\n','') + if "nfiles" in line: + entries.append(int(line.split(":")[1])) + if entries: + return sum(entries) + return -1 + + def getPrimaryDatasetEntries(self): + runmin = -1 + runmax = -1 + if self.run_range is not None: + runmin = self.run_range[0] + runmax = self.run_range[1] + return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax) +### MM + +def getDatasetFromCache( cachename ) : + cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset']) + pckfile = open( cachedir + "/" + cachename ) + dataset = pickle.load(pckfile) + return dataset + +def writeDatasetToCache( cachename, dataset ): + cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset']) + if not os.path.exists(cachedir): + os.mkdir(cachedir) + pckfile = open( cachedir + "/" + cachename, 'w') + pickle.dump(dataset, pckfile) + +def createDataset( user, dataset, pattern, readcache=False, + basedir = None, run_range = None): + + + def cacheFileName(data, user, pattern): + return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern) + + def writeCache(dataset): + writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset ) + + def readCache(data, user, pattern): + return getDatasetFromCache( cacheFileName(data, user, pattern) ) + + if readcache: + try: + data = readCache(dataset, user, pattern) + except IOError: + readcache = False + if not readcache: + if user == 'CMS': + data = CMSDataset( dataset , run_range = run_range) + info = False + elif user == 'LOCAL': + data = LocalDataset( dataset, basedir, pattern) + info = False + else: + data = Dataset( dataset, user, pattern) + writeCache(data) +## if user == 'CMS': +## data = CMSDataset( dataset ) +## elif user == 'LOCAL': +## if basedir is None: +## basedir = os.environ['CMGLOCALBASEDIR'] +## data = LocalDataset( dataset, basedir, pattern ) +## else: +## data = Dataset( user, dataset, pattern ) + return data + +### MM +def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False): + + cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset']) + + def cacheFileName(data, user, dbsInstance, pattern): + cf = data.replace('/','_') + name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format( + dir = cachedir, + user = user, + dbsInstance = dbsInstance, + name = cf, + pattern = pattern) + return name + + def writeCache(dataset): + if not os.path.exists(cachedir): + os.mkdir(cachedir) + cachename = cacheFileName(dataset.name, + dataset.user, + dataset.dbsInstance, + dataset.pattern) + pckfile = open( cachename, 'w') + pickle.dump(dataset, pckfile) + + def readCache(data, user, dbsInstance, pattern): + cachename = cacheFileName(data, user, dbsInstance, pattern) + + pckfile = open( cachename) + dataset = pickle.load(pckfile) + #print 'reading cache' + return dataset + + if readcache: + try: + data = readCache(dataset, user, dbsInstance, pattern) + except IOError: + readcache = False + if not readcache: + if user == 'PRIVATE': + data = PrivateDataset( dataset, dbsInstance ) + info = False + writeCache(data) + return data +### MM diff --git a/PhysicsTools/HeppyCore/python/utils/dataset_test.py b/PhysicsTools/HeppyCore/python/utils/dataset_test.py new file mode 100644 index 0000000000000..688f0a8c0b09c --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/dataset_test.py @@ -0,0 +1,48 @@ +from dataset import * + +import unittest +import os +import shutil + +BASE_DIR = 'datasets' + +def create_dataset(name, number_of_files, basedir=BASE_DIR): + if not os.path.isdir(basedir): + os.mkdir(basedir) + old = os.getcwd() + os.chdir(basedir) + if os.path.isdir(name): + shutil.rmtree(name) + os.mkdir(name) + os.chdir(name) + for i in range(number_of_files): + os.system('touch file_{i:d}.root'.format(i=i)) + os.chdir(old) + +class TestDataset(unittest.TestCase): + + def test_local(self): + n_files = 10 + create_dataset('ds1', n_files) + ds1 = LocalDataset('/ds1', 'datasets', '.*root') + self.assertEqual( len(ds1.listOfGoodFiles()), n_files) + # shutil.rmtree('datasets') + + def test_eos(self): + ds1 = EOSDataset('/eos/cms/store/cmst3/user/cbern/EOSTests/ds1', + '.*root') + self.assertEqual(len(ds1.listOfGoodFiles()), 10) + + def test_eos_fail(self): + self.assertRaises( ValueError, + EOSDataset, 'not_existing_path', '.*root') + # should test that we fail when a plain file is provided + # instead ofa directory.. but eostools not set up for that yet. + + +if __name__ == '__main__': + import os + import sys + if not os.environ.get('CMSSW_BASE', False): + sys.exit(1) + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/utils/deltar.py b/PhysicsTools/HeppyCore/python/utils/deltar.py index 1b54b46314867..0c2eb32eb0a7f 100644 --- a/PhysicsTools/HeppyCore/python/utils/deltar.py +++ b/PhysicsTools/HeppyCore/python/utils/deltar.py @@ -205,14 +205,3 @@ def matchObjectCollection2 ( objects, matchCollection, deltaRMax = 0.3 ): -if __name__ == '__main__': - - import sys - args = sys.argv[1:] - fargs = map( float, args ) - - print 'dR2 = ', deltaR2( *fargs ) - print 'dR = ', deltaR( *fargs ) - - - diff --git a/PhysicsTools/HeppyCore/python/utils/diclist.py b/PhysicsTools/HeppyCore/python/utils/diclist.py index 149efb03fa792..4f1759d4a279e 100644 --- a/PhysicsTools/HeppyCore/python/utils/diclist.py +++ b/PhysicsTools/HeppyCore/python/utils/diclist.py @@ -2,49 +2,43 @@ # https://github.com/cbernet/heppy/blob/master/LICENSE class diclist( list ): + '''list with an internal dictionary for indexing, + allowing to keep dictionary elements ordered. + keys can be everything except an integer. + ''' def __init__(self): super( diclist, self).__init__() + # internal dictionary, will contain key -> index in list self.dico = {} def add( self, key, value ): + if isinstance(key, (int, long)): + raise ValueError("key cannot be an integer") if key in self.dico: raise ValueError("key '{key}' already exists".format(key=key) ) self.dico[key] = len(self) self.append(value) def __getitem__(self, index): - '''These functions are quite risky...cannot use an integer as key...''' + '''index can be a dictionary key, or an integer specifying + the rank of the value to be accessed + ''' try: - # index = int( index ) + # if index is an integer (the rank), use the list. return super(diclist, self).__getitem__(index) except TypeError, ValueError: + # else it's the dictionary key. + # use the internal dictionary to get the index, + # and return the corresponding value from the list return super(diclist, self).__getitem__( self.dico[index] ) def __setitem__(self, index, value): '''These functions are quite risky...''' try: - # why did I add this cast? it's casting string to int... - # index = int( index ) return super(diclist, self).__setitem__(index, value) except TypeError,ValueError: return super(diclist, self).__setitem__( self.dico[index], value ) -if __name__ == '__main__': - dl = diclist() - dl.add(0, 1) - dl.add(2, 2) - dl.add(1, 3) - print dl - print dl[0], dl[1] - - dl = diclist() - dl.add('0', 1) - dl.add('2', 2) - dl.add('1', 3) - - print dl - print dl['0'], dl['1'] - diff --git a/PhysicsTools/HeppyCore/python/utils/diclist_test.py b/PhysicsTools/HeppyCore/python/utils/diclist_test.py new file mode 100644 index 0000000000000..893f7ce9843bd --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/diclist_test.py @@ -0,0 +1,29 @@ +import unittest + +from diclist import diclist + +class DiclistTestCase(unittest.TestCase): + + def test_string_key(self): + dl = diclist() + dl.add('a', 1) + dl.add('b', 2) + dl.add('c', 3) + self.assertEqual([1,2,3], [value for value in dl] ) + self.assertEqual(dl['c'], 3) + + def test_bad_int_key(self): + dl = diclist() + self.assertRaises(ValueError, dl.add, 1, 'a') + self.assertRaises(ValueError, dl.add, 1L, 'a') + + def test_float_key(self): + dl = diclist() + dl.add(1., 'a') + dl.add(2., 'b') + self.assertRaises(IndexError, dl.__getitem__, 2) + self.assertEqual(dl[2.], 'b') + + +if __name__ == '__main__': + unittest.main() diff --git a/PhysicsTools/HeppyCore/python/utils/eostools.py b/PhysicsTools/HeppyCore/python/utils/eostools.py new file mode 100644 index 0000000000000..0a9e9d3cf5948 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/eostools.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python +""" +A module to manipulate files on EOS or on the local file system. Intended to have the same interface as castortools.py. +""" +import sys +import os +import re +import pprint +import shutil + +def setCAFPath(): + """Hack to get the CAF scripts on the PYTHONPATH""" + caf = '/afs/cern.ch/cms/caf/python' + + if caf not in sys.path: + sys.path.append(caf) +setCAFPath() +import cmsIO + +def runXRDCommand(path, cmd, *args): + """Run an xrd command. + + !!! Will, what is happening in case of problem? + ??? At some point, should return a list of lines instead of a string.""" + + lfn = eosToLFN(path) + #print "lfn:", lfn, cmd + tokens = cmsIO.splitPFN(lfnToPFN(lfn)) + + command = ['xrd', tokens[1], cmd, tokens[2]] + command.extend(args) + runner = cmsIO.cmsFileManip() + # print ' '.join(command) + return runner.runCommand(command) + +def runEOSCommand(path, cmd, *args): + """Run an eos command. + + !!! Will, when the EOS command fails, it passes silently... + I think we should really try and raise an exception in case of problems. + should be possible as the return code is provided in the tuple returned by runner.""" + + lfn = eosToLFN(path) + pfn = lfnToPFN(lfn) + tokens = cmsIO.splitPFN(pfn) + + #obviously, this is not nice + command = ['/afs/cern.ch/project/eos/installation/pro/bin/eos.select', cmd] + command.extend(args) + command.append(tokens[2]) + runner = cmsIO.cmsFileManip() + return runner.runCommand(command) + +def isLFN( path ): + """Tests whether this path is a CMS LFN (name starts with /store...)""" + # return re.match('^/store.*', path ) is not None + return path.startswith('/store') + +def isEOS( path ): + """Tests whether this path is a CMS EOS (name starts with /eos...)""" + return path.startswith('/eos') or path.startswith('root://eoscms.cern.ch//eos/cms') + +def eosToLFN( path ): + """Converts a EOS PFN to an LFN. + + Just strip out /eos/cms from path. + If this string is not found, return path. + ??? Shouldn't we raise an exception instead?""" + return path.replace('root://eoscms.cern.ch/', '').replace('/eos/cms','') + +#also define an alias for backwards compatibility +castorToLFN = eosToLFN + +def lfnToPFN( path, tfcProt = 'rfio'): + """Converts an LFN to a PFN. For example: + /store/cmst3/user/cbern/CMG/TauPlusX/Run2011A-03Oct2011-v1/AOD/V2/PAT_CMG_V2_4_0/H2TAUTAU_Nov21 + -> + root://eoscms//eos/cms/store/cmst3/user/cbern/CMG/TauPlusX/Run2011A-03Oct2011-v1/AOD/V2/PAT_CMG_V2_4_0/H2TAUTAU_Nov21?svcClass=cmst3&stageHost=castorcms + + This function only checks path, and does not access the storage system. + If the path is in /store/cmst3, it assumes that the CMST3 svcClass is to be used. + Otherwise, is uses the default one. + + ??? what is tfcprot? """ + + if path.startswith("/store/"): + path = path.replace("/store/","root://eoscms.cern.ch//eos/cms/store/") + if path.startswith("/pnfs/psi.ch/cms/trivcat/"): + path = path.replace("/pnfs/psi.ch/cms/trivcat/","root://t3se01.psi.ch//") + #print "path to cmsFile():", path + entity = cmsIO.cmsFile( path, tfcProt ) +# tokens = cmsIO.splitPFN(entity.pfn) + pfn = '%s://%s//%s/' % (entity.protocol,entity.host,entity.path) + + pfn = entity.pfn + if tfcProt == 'rfio' and \ + entity.path.startswith("/eos/cms/") and \ + str(entity.stat()).startswith("Error 3011: Unable to stat"): + + pfn.replace("/eos/cms","/castor/cern.ch/cms") + pfn.replace("eoscms","castorcms") + return pfn + + +def lfnToEOS( path ): + """Converts LFN to EOS. + + If path is not an LFN in the first place, return path. + ??? shouldn't we raise an exception?""" + if isLFN(path): + pfn = 'root://eoscms.cern.ch//eos/cms/' + path + return pfn.replace('//store','/store') + else: + return path + +#also define an alias for backwards compatibility +lfnToCastor = lfnToEOS + +def isEOSDir( path ): + """Returns True if path is either: + /store/... + or + /eos/cms/store/... + or + root://eoscms.cern.ch//eos/cms/ + + Otherwise, returns False. + + WARNING!! This function does not check for path existence, + and returns true also for plain files. + !!! Will, is my summary correct? + """ + if os.path.exists( path ): + # path does not exist + # COLIN: I think this condition could be removed, + # as it duplicates the following one. + return False + if not path.startswith('/eos') and not path.startswith('/store') and not path.startswith('root://eoscms.cern.ch//eos/cms/'): + # neither an EOS PFN or a LFN. + return False + # at this stage, we must have an EOS PFN or an LFN + pfn = lfnToPFN(eosToLFN(path)) + tokens = cmsIO.splitPFN(pfn) + return tokens and tokens[1].lower().startswith('eos') + +#also define an alias for backwards compatibility +isCastorDir = isEOSDir + + +def isEOSFile( path, tfcProt = 'rfio'): + """Returns True if path is a file or directory stored on EOS (checks for path existence) + ??? This function does not behave well if passed a non EOS path... + returns lots of error messages like: +>>> eostools.isEOSFile('/store/asdfasfd') +Command (['ls', '/', 's', 't', 'o', 'r', 'e', '/', 'a', 's', 'd', 'f', 'a', 's', 'f', 'd', '/store']) failed with return code: 2 +ls: s: No such file or directory +ls: t: No such file or directory +ls: o: No such file or directory +ls: r: No such file or directory +ls: e: No such file or directory +ls: a: No such file or directory +ls: s: No such file or directory +ls: d: No such file or directory +ls: f: No such file or directory +ls: a: No such file or directory +ls: s: No such file or directory +ls: f: No such file or directory +ls: d: No such file or directory +ls: /store: No such file or directory + +ls: s: No such file or directory +ls: t: No such file or directory +ls: o: No such file or directory +ls: r: No such file or directory +ls: e: No such file or directory +ls: a: No such file or directory +ls: s: No such file or directory +ls: d: No such file or directory +ls: f: No such file or directory +ls: a: No such file or directory +ls: s: No such file or directory +ls: f: No such file or directory +ls: d: No such file or directory +ls: /store: No such file or directory + +False + """ + _, _, ret = runEOSCommand( path, 'ls') + return ret == 0 + +#also define an alias for backwards compatibility +isCastorFile = isEOSFile + + +def fileExists( path ): + """Returns true if path is a file or directory stored locally, or on EOS. + + This function checks for the file or directory existence.""" + + eos = isEOSDir(path) + result = False + if eos: + # print 'eos', path + result = isEOSFile(path) + else: + # print 'not eos', path + #check locally + result = os.path.exists(path) + # print result + return result + + +def eosDirSize(path): + '''Returns the size of a directory on EOS in GB.''' + lfn = eosToLFN(path) + res = runEOSCommand(lfn, 'find', '--size') + output = res[0].split('\n') + size = 0 + for file in output: + try: + size += float(file.split('=')[2]) + except IndexError: + pass + return size/1024/1024/1024 + + +def createEOSDir( path ): + """Makes a directory in EOS + + ???Will, I'm quite worried by the fact that if this path already exists, and is + a file, everything will 'work'. But then we have a file, and not a directory, + while we expect a dir...""" + lfn = eosToLFN(path) + if not isEOSFile(lfn): + # if not isDirectory(lfn): + runEOSCommand(lfn,'mkdir','-p') + # entity = cmsIO.cmsFile( lfn,"stageout") + # entity.mkdir([]) + # # print 'created ', path + if isDirectory(path): + return path + else: + raise OSError('cannot create directory '+ path) + +#also define an alias for backwards compatibility +createCastorDir = createEOSDir + +def mkdir(path): + """Create a directory, either on EOS or locally""" + # print 'mkdir', path + if isEOS( path ) or isLFN(path): + createEOSDir(path) + else: + # recursive directory creation (like mkdir -p) + os.makedirs(path) + return path + +def isDirectory(path): + """Returns True if path is a directory on EOS. + + Tests for file existence. + This function returns False for EOS files, and crashes with local paths + + ???Will, this function also seems to work for paths like: + /eos/cms/... + ??? I think that it should work also for local files, see isFile.""" + + out, _, _ = runXRDCommand(path,'existdir') + return 'The directory exists' in out + +def isFile(path): + """Returns True if a path is a file. + + Tests for file existence. + Returns False for directories. + Works on EOS and local paths. + + ???This function works with local files, so not the same as isDirectory... + isFile and isDirectory should behave the same. + """ + + if not path.startswith('/eos') and not path.startswith('/store'): + if( os.path.isfile(path) ): + return True + else: + return False + else: + out, _, _ = runXRDCommand(path,'existfile') + return 'The file exists' in out + +def chmod(path, mode): + """Does chmod on a file or directory""" + # + return runEOSCommand(path, 'chmod', '-r', str(mode)) + + +def listFiles(path, rec = False, full_info = False): + """Provides a list of the specified directory + """ + # -- listing on the local filesystem -- + if os.path.isdir( path ): + if not rec: + # not recursive + return [ '/'.join([path,file]) for file in os.listdir( path )] + else: + # recursive, directories are put in the list first, + # followed by the list of all files in the directory tree + result = [] + allFiles = [] + for root,dirs,files in os.walk(path): + result.extend( [ '/'.join([root,dir]) for dir in dirs] ) + allFiles.extend( [ '/'.join([root,file]) for file in files] ) + result.extend(allFiles) + return result + # -- listing on EOS -- + cmd = 'dirlist' + if rec: + cmd = 'dirlistrec' + files, _, _ = runXRDCommand(path, cmd) + result = [] + for line in files.split('\n'): + tokens = [t for t in line.split() if t] + if tokens: + #convert to an LFN + # result.append(tuple(tokens)) + #COLIN need same interface for eos and local fs + if full_info: + result.append( tokens) + else: + result.append( tokens[4] ) + return result + +def which(cmd): + command = ['which', cmd] + runner = cmsIO.cmsFileManip() + out, _, _ = runner.runCommand(command) + + lines = [line for line in out.split('\n') if line] + if len(lines) == 1: + return lines[0] + elif len(lines) == 2: + return lines[1] + else: + return lines + +def ls(path, rec = False): + """Provides a simple list of the specified directory, works on EOS and locally""" + return [eosToLFN(t) for t in listFiles(path, rec)] + +def ls_EOS(path, rec = False): + """Provides a simple list of the specified directory, works on EOS only, but is faster than the xrd version""" + if rec: + stdout, _, ret = runEOSCommand(path,'find','-f') + return [eosToLFN(line) for line in stdout.split('\n') if line] + else: + stdout, _, ret = runEOSCommand(path,'ls') + lfn = eosToLFN(path) + return [os.path.join(lfn,line) for line in stdout.split('\n') if line] + +def rm(path, rec=False): + """rm, works on EOS and locally. + + Colin: should implement a -f mode and a confirmation when deleting dirs recursively.""" + # print 'rm ', path + path = lfnToEOS(path) + if isEOS(path): + if rec: + runEOSCommand(path, 'rm', '-r') + else: + runEOSCommand(path,'rm') + elif os.path.exists(path): + if not rec: + os.remove( path ) + else: + shutil.rmtree(path) + else: + raise ValueError(path + ' is not EOS and not local... should not happen!') + +def remove( files, rec = False): + """Remove a list of files and directories, possibly recursively + + Colin: Is that obsolete? why not use rm?""" + for path in files: + lfn = eosToLFN(path) + if not rec: + rm(path) + else: + #this should be used with care + file_list = ls(path, rec = True) + file_list.append(lfn) + + #order the files in depth order - i.e. remove the deepest files first + files_rec = sorted([(len([ff for ff in f.split('/') if ff]), f) for f in file_list if f and f.startswith(lfn)], reverse = True) + + for f in files_rec: + rm(f[1]) + +def cat(path): + """cat, works on EOS and locally""" + path = lfnToEOS(path) + if isEOS(path): + #print "the file to cat is:", path + out, err, _ = runXRDCommand(path,'cat') + lines = [] + if out: + pattern = re.compile('cat returned [0-9]+') + for line in out.split('\n'): + match = pattern.search(line) + if line and match is not None: + lines.append(line.replace(match.group(0),'')) + break + else: + lines.append(line) + if err: + print >> sys.stderr, out + print >> sys.stderr, err + allLines = '\n'.join(lines) + if allLines and not allLines.endswith('\n'): + allLines += '\n' + return allLines + else: + content = file(path).read() + if content and not content.endswith('\n'): + content += '\n' + return content + +def xrdcp(src, dest): + """Does a copy of files using xrd. + + Colin: implement a generic cp interface as done for rm, ls, etc?""" + + recursive = False + + #first the src file + pfn_src = src + if os.path.exists(src): + #local + pfn_src = src + if os.path.isdir(src): + recursive = True + elif fileExists(src): + src = eosToLFN(src) + pfn_src = lfnToPFN(src) + if isDirectory(src): + recursive = True + else: + raise ValueError(src + ' does not exist.') + + #now the dest + pfn_dest = dest + if isEOSDir(dest): + dest = eosToLFN(dest) + pfn_dest = lfnToPFN(dest) + if isDirectory(dest): + tokens = cmsIO.splitPFN(pfn_dest) + pfn_dest = '%s://%s//%s/' % (tokens[0],tokens[1],tokens[2]) + elif os.path.exists(dest): + pfn_dest = dest + + command = ['xrdcp'] + if recursive: + # print 'recursive' + topDir = src.rstrip('/').split('/')[-1] + if topDir != '.': + dest = '/'.join([dest, topDir]) + # print 'mkdir ' + dest + mkdir( dest ) + files = listFiles(src, rec=True) + # pprint.pprint( [file[4] for file in files] ) + for srcFile in files: + # srcFile = file[4] + pfnSrcFile = srcFile + if isEOSDir(srcFile): + srcFile = eosToLFN(srcFile) + pfnSrcFile = lfnToPFN(srcFile) + destFile = srcFile.replace( src, '' ) + destFile = '/'.join([dest,destFile]) + pfnDestFile = destFile + if isEOSDir(destFile): + lfnDestFile = eosToLFN(destFile) + pfnDestFile = lfnToPFN(lfnDestFile) + # print 'srcFile', pfnSrcFile + # print 'destFile', pfnDestFile + if isFile(srcFile): + _xrdcpSingleFile( pfnSrcFile, pfnDestFile ) + else: + mkdir(destFile) + else: + _xrdcpSingleFile( pfn_src, pfn_dest ) + + +def _xrdcpSingleFile( pfn_src, pfn_dest): + """Copies a single file using xrd.""" + + command = ['xrdcp'] + command.append(pfn_src) + command.append(pfn_dest) + # print ' '.join(command) + run = True + if run: + runner = cmsIO.cmsFileManip() + out, err, ret = runner.runCommand(command) + if err: + print >> sys.stderr, out + print >> sys.stderr, err + return ret + +def move(src, dest): + """Move filename1 to filename2 locally to the same server""" + + src = eosToLFN(src) + dest = eosToLFN(dest) + + runXRDCommand(src,'mv', lfnToEOS(dest)) + +def matchingFiles( path, regexp): + """Return a list of files matching a regexp""" + + # print path, regexp + pattern = re.compile( regexp ) + #files = ls_EOS(path) + files = ls(path) + # print files + return [f for f in files if pattern.match(os.path.basename(f)) is not None] + +def datasetNotEmpty( path, regexp ): + pattern = re.compile( regexp ) + files = ls_EOS(path) + + for f in files: + if pattern.match( os.path.basename(f) ) is not None: + return 1 + return 0 + +def cmsStage( absDestDir, files, force): + """Runs cmsStage with LFNs if possible""" + + destIsEOSDir = isEOSDir(absDestDir) + if destIsEOSDir: + createEOSDir( absDestDir ) + + for fname in files: + command = ['cmsStage'] + if force: + command.append('-f') + command.append(eosToLFN(fname)) + command.append(eosToLFN(absDestDir)) + print ' '.join(command) + runner = cmsIO.cmsFileManip() + runner.runCommand(command) diff --git a/PhysicsTools/HeppyCore/python/utils/logger.py b/PhysicsTools/HeppyCore/python/utils/logger.py new file mode 100644 index 0000000000000..fcdbdaddb5634 --- /dev/null +++ b/PhysicsTools/HeppyCore/python/utils/logger.py @@ -0,0 +1,130 @@ +from optparse import OptionParser +import sys,os, re, subprocess, datetime + +import eostools as castortools + +class logger: + '''COLIN: do something cleaner with tagPackage''' + def __init__(self, dirLocalOrTgzDirOnCastor): + + self.dirLocal = None + self.tgzDirOnCastor = None + dirLocalOrTgzDirOnCastor = dirLocalOrTgzDirOnCastor.rstrip('/') + + if self.isDirLocal( dirLocalOrTgzDirOnCastor ): + self.dirLocal = dirLocalOrTgzDirOnCastor + elif self.isTgzDirOnCastor( dirLocalOrTgzDirOnCastor ): + self.tgzDirOnCastor = dirLocalOrTgzDirOnCastor + else: + raise ValueError( dirLocalOrTgzDirOnCastor + ' is neither a tgz directory on castor (provide a LFN!) nor a local directory') + + + def isDirLocal(self, file ): + if os.path.isdir( file ): + return True + else: + return False + + def isTgzDirOnEOS(self, file ): + '''Checks if file is a .tgz file in an eos dir''' + if not castortools.isCastorDir( file ): + file = castortools.castorToLFN(file) + + if castortools.isLFN( file ): + tgzPattern = re.compile('.*\.tgz$') + m = tgzPattern.match( file ) + if m: + return True + else: + return False + else: + return False + + isTgzDirOnCastor = isTgzDirOnEOS + + def dump(self): + print 'local dir :', self.dirLocal + print 'castor archive :',self.tgzDirOnCastor + + def addFile(self, file): + # if self.dirLocal == None: + # self.stageIn() + # os.system( 'cp %s %s' % (file, self.dirLocal) ) + # self.stageOut( self.tgzDirOnCastor ) + if self.dirLocal != None: + os.system( 'cp %s %s' % (file, self.dirLocal) ) + + def logCMSSW(self): + showtagsLog = 'logger_showtags.txt' + diffLog = 'logger_diff.txt' + # os.system('showtags > ' + showtagsLog) + self.showtags(showtagsLog) + self.gitdiff(diffLog) + self.addFile(showtagsLog) + self.addFile(diffLog) + + def logJobs(self, n): + nJobs = 'logger_jobs.txt' + out = file(nJobs,'w') + out.write('NJobs: %i\n' % n) + out.close() + self.addFile(nJobs) + + def gitdiff(self, log): + oldPwd = os.getcwd() + os.chdir( os.getenv('CMSSW_BASE') + '/src/' ) + diffCmd = 'git diff -p --stat --color=never > %s/%s 2> /dev/null' % (oldPwd, log) + print diffCmd + os.system( diffCmd ) + os.chdir( oldPwd ) + + def showtags(self, log): + oldPwd = os.getcwd() + os.chdir( os.getenv('CMSSW_BASE') + '/src/' ) + cmd = 'echo "Test Release based on: $CMSSW_VERSION" > %s/%s 2> /dev/null' % (oldPwd, log) + os.system( cmd ) + cmd = 'echo "Base Release in: $CMSSW_RELEASE_BASE" >> %s/%s 2> /dev/null' % (oldPwd, log) + os.system( cmd ) + cmd = 'echo "Your Test release in: $CMSSW_BASE" >> %s/%s 2> /dev/null' % (oldPwd, log) + os.system( cmd ) + cmd = 'git status --porcelain -b | head -n 1 >> %s/%s 2> /dev/null' % (oldPwd, log) + os.system( cmd ) + cmd = 'git log -n 100 --format="%%T %%ai %%s %%d" >> %s/%s 2> /dev/null' % (oldPwd, log) + os.system( cmd ) + os.chdir( oldPwd ) + + def stageIn(self): + if self.tgzDirOnCastor != None: + # castortools.xrdcp( '.', [self.tgzDirOnCastor] ) + cmsStage = 'cmsStage -f ' + self.tgzDirOnCastor + ' .' + print cmsStage + os.system( cmsStage ) + tgzDir = os.path.basename( self.tgzDirOnCastor ) + print tgzDir + os.system('tar -zxvf ' + tgzDir) + os.system('rm ' + tgzDir ) + (root, ext) = os.path.splitext(tgzDir) + self.dirLocal = root + else: + print 'cannot stage in, the log had not been staged out' + + def stageOut(self, castorDir): + + castorDir = castortools.eosToLFN( castorDir ) + if not castortools.isLFN( castorDir ): + print 'cannot stage out, you need to provide an LFN as a destination directory, beginning with /store .' + return False + + if self.dirLocal != None: + tgzDir = self.dirLocal + '.tgz' + tgzCmd = 'tar -zcvf ' + tgzDir + ' ' + self.dirLocal + print tgzCmd + os.system( tgzCmd) + cmsStage = 'cmsStage -f %s %s' % (tgzDir, castorDir ) + print cmsStage + os.system( cmsStage ) + os.system('rm ' + tgzDir ) + self.tgzDirOnCastor = castorDir + '/' + tgzDir + else: + print 'cannot stage out, the log is not staged in' + diff --git a/PhysicsTools/HeppyCore/scripts/heppy b/PhysicsTools/HeppyCore/scripts/heppy index c5283f9858ab5..67a30372d8041 100755 --- a/PhysicsTools/HeppyCore/scripts/heppy +++ b/PhysicsTools/HeppyCore/scripts/heppy @@ -1,2 +1,4 @@ #!/usr/bin/env bash -ipython --pdb -noconfirm_exit -- $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/heppy.py "$@" +ln -sf $CMSSW_BASE/src/PhysicsTools/HeppyCore/scripts/heppy_loop.py tmp_heppy.py +ipython --pdb -- tmp_heppy.py "$@" +rm tmp_heppy.py diff --git a/PhysicsTools/HeppyCore/scripts/heppy_batch.py b/PhysicsTools/HeppyCore/scripts/heppy_batch.py index 1eec3c3f1ead3..e65747c3a123e 100755 --- a/PhysicsTools/HeppyCore/scripts/heppy_batch.py +++ b/PhysicsTools/HeppyCore/scripts/heppy_batch.py @@ -9,7 +9,7 @@ import math from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager -from PhysicsTools.HeppyCore.framework.heppy import split +from PhysicsTools.HeppyCore.framework.heppy_loop import split def batchScriptPADOVA( index, jobDir='./'): '''prepare the LSF version of the batch script, to run on LSF''' @@ -248,7 +248,8 @@ def batchScriptLocal( remoteDir, index ): script = """#!/bin/bash echo 'running' -python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck echo +python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck +echo echo 'sending the job directory back' mv Loop/* ./ """ @@ -306,6 +307,7 @@ def PrepareJobUser(self, jobDir, value ): cfgFileName = args[0] handle = open(cfgFileName, 'r') + # import pdb; pdb.set_trace() cfo = imp.load_source("pycfg", cfgFileName, handle) config = cfo.config handle.close() diff --git a/PhysicsTools/HeppyCore/scripts/heppy_loop.py b/PhysicsTools/HeppyCore/scripts/heppy_loop.py new file mode 100755 index 0000000000000..51f79bd6e8a40 --- /dev/null +++ b/PhysicsTools/HeppyCore/scripts/heppy_loop.py @@ -0,0 +1,61 @@ +if __name__ == '__main__': + from optparse import OptionParser + from PhysicsTools.HeppyCore.framework.heppy_loop import * + + parser = OptionParser() + parser.usage = """ + %prog + For each component, start a Loop. + 'name' is whatever you want. + """ + + parser.add_option("-N", "--nevents", + dest="nevents", + type="int", + help="number of events to process", + default=None) + parser.add_option("-p", "--nprint", + dest="nprint", + help="number of events to print at the beginning", + default=5) + parser.add_option("-e", "--iEvent", + dest="iEvent", + help="jump to a given event. ignored in multiprocessing.", + default=None) + parser.add_option("-f", "--force", + dest="force", + action='store_true', + help="don't ask questions in case output directory already exists.", + default=False) + parser.add_option("-i", "--interactive", + dest="interactive", + action='store_true', + help="stay in the command line prompt instead of exiting", + default=False) + parser.add_option("-t", "--timereport", + dest="timeReport", + action='store_true', + help="Make a report of the time used by each analyzer", + default=False) + parser.add_option("-v", "--verbose", + dest="verbose", + action='store_true', + help="increase the verbosity of the output (from 'warning' to 'info' level)", + default=False) + parser.add_option("-q", "--quiet", + dest="quiet", + action='store_true', + help="do not print log messages to screen.", + default=False) + parser.add_option("-o", "--option", + dest="extraOptions", + type="string", + action="append", + default=[], + help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file") + + (options,args) = parser.parse_args() + + main(options, args) + if not options.interactive: + exit() # trigger exit also from ipython