Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CMS example with ServiceX 3 client #225

Merged
merged 23 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ jobs:
with:
fetch-depth: 0
- uses: jpetrucciani/ruff-check@main
with:
flags: '--exclude *ipynb'
27 changes: 20 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7274936.svg)](https://doi.org/10.5281/zenodo.7274936)
[![Documentation Status](https://readthedocs.org/projects/agc/badge/?version=latest)](https://agc.readthedocs.io/en/latest/?badge=latest)

**Interested in other AGC-related projects?** See [list below](#agc-implementations-and-related-projects).

The Analysis Grand Challenge (AGC) is about performing the last steps in an analysis pipeline at scale to test workflows envisioned for the HL-LHC.
This includes
Expand Down Expand Up @@ -45,16 +46,28 @@ We believe that the AGC can be useful in various contexts:
- realistic environment to prototype analysis workflows,
- functionality, integration & performance test for analysis facilities.

We are very interested in seeing (parts of) the AGC implemented in different ways!
Besides the implementation in this repository, have a look at

- a ROOT RDataFrame-based implementation: [root-project/analysis-grand-challenge](https://github.com/root-project/analysis-grand-challenge),
- a pure Julia implementation: [Moelf/LHC_AGC.jl](https://github.com/Moelf/LHC_AGC.jl).
- a columnflow implementation: [columnflow/agc_cms_ttbar](https://github.com/columnflow/agc_cms_ttbar).

We are very interested in seeing (parts of) the AGC implemented in different ways.
Please get in touch if you have investigated other approaches you would like to share!
There is no need to implement the full analysis task — it splits into pieces (for example the production of histograms) that can also be tackled individually.

## AGC implementations and related projects

Besides the implementation in this repository, have a look at the following implementations as well:

- ROOT RDataFrame-based implementation: [root-project/analysis-grand-challenge](https://github.com/root-project/analysis-grand-challenge)
- pure Julia implementation: [Moelf/LHC_AGC.jl](https://github.com/Moelf/LHC_AGC.jl)
- columnflow implementation: [columnflow/agc_cms_ttbar](https://github.com/columnflow/agc_cms_ttbar)

Additional related projects are listed below.
Are we missing some things in this list?
Please get in touch!

- AGC on REANA with Snakemake: [iris-hep/agc-reana](https://github.com/iris-hep/agc-reana)
- small demo of AGC with `dask-awkward` and `coffea` 2024: [iris-hep/calver-coffea-agc-demo](https://github.com/iris-hep/calver-coffea-agc-demo/)
- columnar analysis with ATLAS PHYSLITE Open Data: [iris-hep/agc-physlite](https://github.com/iris-hep/agc-physlite/)
- exploring automatic differentiation for physics analysis: [iris-hep/agc-autodiff](https://github.com/iris-hep/agc-autodiff/)
- AGC data processing with RNTuple files: [iris-hep/agc-rntuple](https://github.com/iris-hep/agc-rntuple)

## More details: what is being investigated in the AGC context

- New user interfaces: Complementary services that present the analyst with a notebook-based interface. Example software: Jupyter.
Expand Down
247 changes: 99 additions & 148 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.ipynb

Large diffs are not rendered by default.

61 changes: 39 additions & 22 deletions analyses/atlas-open-data-hzz/HZZ_analysis_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@

import awkward as ak
import cabinetry
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import mplhep
import numpy as np
import pyhf
import uproot
from servicex import ServiceXDataset

from coffea import processor
from coffea.nanoevents.schemas.base import BaseSchema
import utils
from utils import infofile # contains cross-section information

import servicex

import vector
vector.register_awkward()

Expand All @@ -54,6 +53,9 @@
# ServiceX behavior: ignore cache with repeated queries
IGNORE_CACHE = False

# ServiceX behavior: choose query language
USE_SERVICEX_UPROOT_RAW = True

# %% [markdown]
# ## Introduction
#
Expand Down Expand Up @@ -147,12 +149,14 @@
# <span style="color:darkgreen">**Systematic uncertainty added:**</span> scale factor variation, applied already at event selection stage. Imagine that this could be a calculation that requires a lot of different variables which are no longer needed downstream afterwards, so it makes sense to do it here.

# %%
def get_lepton_query(source: ObjectStream) -> ObjectStream:
"""Performs event selection: require events with exactly four leptons.
def get_lepton_query():
"""Performs event selection with func_adl transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
return source.Where(lambda event: event.lep_n == 4).Select(
from servicex import query as q
return q.FuncADL_Uproot().FromTree('mini')\
.Where(lambda event: event.lep_n == 4).Select(
lambda e: {
"lep_pt": e.lep_pt,
"lep_eta": e.lep_eta,
Expand All @@ -179,23 +183,33 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:
}
)

def get_lepton_query_uproot_raw():
"""Performs event selection with uproot-raw transformer: require events with exactly four leptons.
Also select all columns needed further downstream for processing &
histogram filling.
"""
from servicex import query as q
return q.UprootRaw([{'treename': 'mini',
'expressions': ['lep_pt', 'lep_eta', 'lep_phi', 'lep_energy', 'lep_charge',
'lep_typeid', 'mcWeight', 'scaleFactor', 'scaleFactorUP', 'scaleFactorDOWN'],
'aliases': { 'lep_typeid': 'lep_type', 'lep_energy': 'lep_E',
'scaleFactor': 'scaleFactor_ELE*scaleFactor_MUON*scaleFactor_LepTRIGGER*scaleFactor_PILEUP',
'scaleFactorUP': 'scaleFactor*1.1',
'scaleFactorDOWN': 'scaleFactor*0.9' }
}])

# %% [markdown]
# # Caching the queried datasets with `ServiceX`
#
# Using the queries created with `func_adl`, we are using `ServiceX` to read the ATLAS Open Data files to build cached files with only the specific event information as dictated by the query.

# %%
# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "mini", backend_name="uproot")

# tell low-level infrastructure not to contact ServiceX yet, only to
# return the qastle string it would have sent
dummy_ds.return_qastle = True

# create the query
lepton_query = get_lepton_query(dummy_ds)
query = lepton_query.value()
if USE_SERVICEX_UPROOT_RAW:
query = get_lepton_query_uproot_raw()
else:
query = get_lepton_query()


# now we query the files and create a fileset dictionary containing the
# URLs pointing to the queried files
Expand All @@ -204,13 +218,15 @@ def get_lepton_query(source: ObjectStream) -> ObjectStream:

fileset = {}

for ds_name in input_files.keys():
ds = ServiceXDataset(input_files[ds_name], backend_name="uproot", ignore_cache=IGNORE_CACHE)
files = ds.get_data_rootfiles_uri(query, as_signed_url=True, title=ds_name)
bundle = { 'General': { 'Delivery': 'URLs' },
'Sample': [ { 'Name': ds_name,
'Query': query,
'Dataset': servicex.dataset.FileList(input_files[ds_name]),
'IgnoreLocalCache': IGNORE_CACHE } for ds_name in input_files.keys() ]
}

fileset[ds_name] = {"files": [f.url for f in files],
"metadata": {"dataset_name": ds_name}
}
results = servicex.deliver(bundle)
fileset = { _: {"files": results[_], "metadata": {"dataset_name": _}} for _ in results }

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down Expand Up @@ -383,7 +399,8 @@ def postprocess(self, accumulator):
executor = processor.FuturesExecutor(workers=NUM_CORES)
run = processor.Runner(executor=executor, savemetrics=True, metadata_cache={},
chunksize=CHUNKSIZE, schema=BaseSchema)
all_histograms, metrics = run(fileset, "servicex", processor_instance=HZZAnalysis())
# The trees returned by ServiceX will have different names depending on the query language used
all_histograms, metrics = run(fileset, "mini" if USE_SERVICEX_UPROOT_RAW else "servicex", processor_instance=HZZAnalysis())

print(f"execution took {time.time() - t0:.2f} seconds")

Expand Down
11 changes: 0 additions & 11 deletions analyses/atlas-open-data-hzz/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
import os
import shutil

from coffea.processor import servicex
from func_adl import ObjectStream
import matplotlib.pyplot as plt
import numpy as np
from servicex.servicex import ServiceXDataset


def clean_up():
Expand Down Expand Up @@ -61,11 +58,3 @@ def save_figure(figname: str):

for filetype in ["pdf", "png"]:
fig.savefig(f"figures/{figname}.{filetype}")


def make_datasource(fileset:dict, name: str, query: ObjectStream):
"""Creates a ServiceX datasource for a particular ATLAS Open data file."""
datasets = [ServiceXDataset(fileset[name], backend_name="uproot")]
return servicex.DataSource(
query=query, metadata={"dataset_category": name}, datasets=datasets
)
Loading
Loading