Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CMS example with ServiceX 3 client #225

Merged
merged 23 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 98 additions & 147 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.ipynb

Large diffs are not rendered by default.

59 changes: 38 additions & 21 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@

import awkward as ak
import cabinetry
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import mplhep
import numpy as np
import pyhf
import uproot
from servicex import ServiceXDataset

from coffea import processor
from coffea.nanoevents.schemas.base import BaseSchema
Expand All @@ -54,6 +51,9 @@
# ServiceX behavior: ignore cache with repeated queries
IGNORE_CACHE = False

# ServiceX behavior: choose query language
USE_SERVICEX_UPROOT_RAW = True

# %% [markdown]
# ## Introduction
#
Expand Down Expand Up @@ -147,12 +147,14 @@
# <span style="color:darkgreen">**Systematic uncertainty added:**</span> scale factor variation, applied already at event selection stage. Imagine that this could be a calculation that requires a lot of different variables which are no longer needed downstream afterwards, so it makes sense to do it here.

# %%
def get_lepton_query(source: ObjectStream) -> ObjectStream:
"""Performs event selection: require events with exactly four leptons.
def get_lepton_query():
"""Performs event selection with func_adl transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
return source.Where(lambda event: event.lep_n == 4).Select(
from servicex import query as q
return q.FuncADL_Uproot().FromTree('mini')\
.Where(lambda event: event.lep_n == 4).Select(
lambda e: {
"lep_pt": e.lep_pt,
"lep_eta": e.lep_eta,
Expand All @@ -179,23 +181,35 @@
}
)

def get_lepton_query_uproot_raw():
"""Performs event selection with uproot-raw transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
from servicex import query as q
return q.UprootRaw([{'treename': 'mini',
'expressions': ['lep_pt', 'lep_eta', 'lep_phi', 'lep_energy', 'lep_charge',
'lep_typeid', 'mcWeight', 'scaleFactor', 'scaleFactorUP', 'scaleFactorDOWN'],
'aliases': { 'lep_typeid': 'lep_type', 'lep_energy': 'lep_E',
'scaleFactor': 'scaleFactor_ELE*scaleFactor_MUON*scaleFactor_LepTRIGGER*scaleFactor_PILEUP',
'scaleFactorUP': 'scaleFactor*1.1',
'scaleFactorDOWN': 'scaleFactor*0.9' }
}])

# %% [markdown]
# # Caching the queried datasets with `ServiceX`
#
# Using the queries created with `func_adl`, we are using `ServiceX` to read the ATLAS Open Data files to build cached files with only the specific event information as dictated by the query.

# %%
# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "mini", backend_name="uproot")

# tell low-level infrastructure not to contact ServiceX yet, only to
# return the qastle string it would have sent
dummy_ds.return_qastle = True
import servicex

Check failure on line 205 in analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.py

View workflow job for this annotation

GitHub Actions / linter

Ruff (E402)

analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.py:205:1: E402 Module level import not at top of file
ponyisi marked this conversation as resolved.
Show resolved Hide resolved

# create the query
lepton_query = get_lepton_query(dummy_ds)
query = lepton_query.value()
if USE_SERVICEX_UPROOT_RAW:
query = get_lepton_query_uproot_raw()
else:
query = get_lepton_query()


# now we query the files and create a fileset dictionary containing the
# URLs pointing to the queried files
Expand All @@ -204,13 +218,15 @@

fileset = {}

for ds_name in input_files.keys():
ds = ServiceXDataset(input_files[ds_name], backend_name="uproot", ignore_cache=IGNORE_CACHE)
files = ds.get_data_rootfiles_uri(query, as_signed_url=True, title=ds_name)
bundle = { 'General': { 'Delivery': 'URLs' },
'Sample': [ { 'Name': ds_name,
'Query': query,
'Dataset': servicex.dataset.FileList(input_files[ds_name]),
'IgnoreLocalCache': IGNORE_CACHE } for ds_name in input_files.keys() ]
}

fileset[ds_name] = {"files": [f.url for f in files],
"metadata": {"dataset_name": ds_name}
}
results = servicex.deliver(bundle)
fileset = { _: {"files": results[_], "metadata": {"dataset_name": _}} for _ in results }

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down Expand Up @@ -383,7 +399,8 @@
executor = processor.FuturesExecutor(workers=NUM_CORES)
run = processor.Runner(executor=executor, savemetrics=True, metadata_cache={},
chunksize=CHUNKSIZE, schema=BaseSchema)
all_histograms, metrics = run(fileset, "servicex", processor_instance=HZZAnalysis())
# The trees returned by ServiceX will have different names depending on the query language used
all_histograms, metrics = run(fileset, "mini" if USE_SERVICEX_UPROOT_RAW else "servicex", processor_instance=HZZAnalysis())

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down
Loading
Loading