import typing as tp
import warnings
import awkward
from awkward_zipper.awkward_util import (
_check_equal_lengths,
_non_materializing_get_field,
_rewrap,
)
from awkward_zipper.kernels import (
children,
counts2nestedindex,
counts2offsets,
distinct_children_deep,
distinct_parent,
local2globalindex,
nestedindex,
)
from awkward_zipper.layouts.base import BaseLayoutBuilder
[docs]
class NanoAOD(BaseLayoutBuilder):
"""NanoAOD layout builder
The NanoAOD layout is built from all branches found in the supplied file, based on
the naming pattern of the branches. The following additional arrays are constructed:
- Any branches named ``n{name}`` are assumed to be counts branches and converted to offsets ``o{name}``
- Any local index branches with names matching ``{source}_{target}Idx*`` are converted to global indexes for the event chunk (postfix ``G``)
- Any `nested_items` are constructed, if the necessary branches are available
- Any `special_items` are constructed, if the necessary branches are available
From those arrays, NanoAOD collections are formed as collections of branches grouped by name, where:
- one branch exists named ``name`` and no branches start with ``name_``, interpreted as a single flat array;
- one branch exists named ``name``, one named ``n{name}``, and no branches start with ``name_``, interpreted as a single jagged array;
- no branch exists named ``{name}`` and many branches start with ``name_*``, interpreted as a flat table; or
- one branch exists named ``n{name}`` and many branches start with ``name_*``, interpreted as a jagged table.
Collections are assigned mixin types according to the `mixins` mapping.
All collections are then zipped into one `base.NanoEvents` record and returned.
There is a class-level variable ``warn_missing_crossrefs`` which will alter the behavior of
NanoAOD. If warn_missing_crossrefs is true then when a missing global index cross-ref
target is encountered a warning will be issued. Regardless, the cross-reference is dropped.
The same holds for ``error_missing_events_id``. If error_missing_events_id is true, then when the 'run', 'event',
or 'luminosityBlock' fields are missing, an exception will be thrown; if it is false, just a warning will be issued.
"""
warn_missing_crossrefs = True # If True, issues a warning when a missing global index cross-ref target is encountered
error_missing_event_ids = True # If True, raises an exception when 'run', 'event', or 'luminosityBlock' fields are missing
event_ids: tp.ClassVar = ["run", "luminosityBlock", "event"]
"""List of NanoAOD event IDs
"""
mixins: tp.ClassVar = {
"CaloMET": "MissingET",
"ChsMET": "MissingET",
"GenMET": "MissingET",
"MET": "MissingET",
"METFixEE2017": "MissingET",
"PuppiMET": "MissingET",
"RawMET": "MissingET",
"RawPuppiMET": "MissingET",
"TkMET": "MissingET",
# pseudo-lorentz: pt, eta, phi, mass=0
"IsoTrack": "PtEtaPhiMCollection",
"SoftActivityJet": "PtEtaPhiMCollection",
"TrigObj": "PtEtaPhiMCollection",
# True lorentz: pt, eta, phi, mass
"FatJet": "FatJet",
"GenDressedLepton": "PtEtaPhiMCollection",
"GenIsolatedPhoton": "PtEtaPhiMCollection",
"GenJet": "PtEtaPhiMCollection",
"GenJetAK8": "PtEtaPhiMCollection",
"Jet": "Jet",
"LHEPart": "PtEtaPhiMCollection",
"SubGenJetAK8": "PtEtaPhiMCollection",
"SubJet": "PtEtaPhiMCollection",
# Candidate: lorentz + charge
"Electron": "Electron",
"LowPtElectron": "LowPtElectron",
"Muon": "Muon",
"Photon": "Photon",
"FsrPhoton": "FsrPhoton",
"Tau": "Tau",
"GenVisTau": "GenVisTau",
# special
"GenPart": "GenParticle",
"PV": "Vertex",
"SV": "SecondaryVertex",
}
"""Default configuration for mixin types, based on the collection name.
The types are implemented in the `coffea.nanoevents.methods.nanoaod` module.
"""
all_cross_references: tp.ClassVar = {
"Electron_genPartIdx": "GenPart",
"Electron_jetIdx": "Jet",
"Electron_photonIdx": "Photon",
"LowPtElectron_electronIdx": "Electron",
"LowPtElectron_genPartIdx": "GenPart",
"LowPtElectron_photonIdx": "Photon",
"FatJet_genJetAK8Idx": "GenJetAK8",
"FatJet_subJetIdx1": "SubJet",
"FatJet_subJetIdx2": "SubJet",
"FsrPhoton_muonIdx": "Muon",
"GenPart_genPartIdxMother": "GenPart",
"GenVisTau_genPartIdxMother": "GenPart",
"Jet_electronIdx1": "Electron",
"Jet_electronIdx2": "Electron",
"Jet_genJetIdx": "GenJet",
"Jet_muonIdx1": "Muon",
"Jet_muonIdx2": "Muon",
"Muon_fsrPhotonIdx": "FsrPhoton",
"Muon_genPartIdx": "GenPart",
"Muon_jetIdx": "Jet",
"Photon_electronIdx": "Electron",
"Photon_genPartIdx": "GenPart",
"Photon_jetIdx": "Jet",
"Tau_genPartIdx": "GenPart",
"Tau_jetIdx": "Jet",
}
"""Cross-references, where an index is to be interpreted with respect to another collection
Each such cross-reference will be converted to a global indexer, so that arbitrarily sliced events
can still resolve the indirection back the parent events
"""
nested_items: tp.ClassVar = {
"FatJet_subJetIdxG": ["FatJet_subJetIdx1G", "FatJet_subJetIdx2G"],
"Jet_muonIdxG": ["Jet_muonIdx1G", "Jet_muonIdx2G"],
"Jet_electronIdxG": ["Jet_electronIdx1G", "Jet_electronIdx2G"],
}
"""Nested collections, where nesting is accomplished by a fixed-length set of indexers"""
nested_index_items: tp.ClassVar = {
"Jet_pFCandsIdxG": ("Jet_nConstituents", "JetPFCands"),
"FatJet_pFCandsIdxG": ("FatJet_nConstituents", "FatJetPFCands"),
"GenJet_pFCandsIdxG": ("GenJet_nConstituents", "GenJetCands"),
"GenFatJet_pFCandsIdxG": ("GenJetAK8_nConstituents", "GenFatJetCands"),
}
"""Nested collections, where nesting is accomplished by assuming the target can be unflattened according to a source counts"""
special_items: tp.ClassVar = {
"GenPart_distinctParentIdxG": (
distinct_parent,
("GenPart_genPartIdxMotherG", "GenPart_pdgId"),
),
"GenPart_childrenIdxG": (
children,
(
"nGenPart",
"GenPart_genPartIdxMotherG",
),
),
"GenPart_distinctChildrenIdxG": (
children,
(
"nGenPart",
"GenPart_distinctParentIdxG",
),
),
"GenPart_distinctChildrenDeepIdxG": (
distinct_children_deep,
(
"nGenPart",
"GenPart_genPartIdxMotherG",
"GenPart_pdgId",
),
),
}
"""Special arrays, where the callable and input arrays are specified in the value"""
def __init__(self, version="latest"):
self._version = version
self.cross_references = dict(self.all_cross_references)
if version == "latest":
pass
else:
if int(version) < 7:
del self.cross_references["FatJet_genJetAK8Idx"]
if int(version) < 6:
del self.cross_references["FsrPhoton_muonIdx"]
del self.cross_references["Muon_fsrPhotonIdx"]
[docs]
@classmethod
def v7(cls):
"""Build the NanoEvents assuming NanoAODv7
For example, one can use ``NanoEventsFactory.from_root("file.root", schemaclass=NanoAOD.v7)``
to ensure NanoAODv7 compatibility.
Returns
-------
out: NanoAOD
Schema assuming NanoAODv7
"""
return cls(version="7")
[docs]
@classmethod
def v6(cls):
"""Build the NanoEvents assuming NanoAODv6
Returns
-------
out: NanoAOD
Schema assuming NanoAODv6
"""
return cls(version="6")
[docs]
@classmethod
def v5(cls):
"""Build the NanoEvents assuming NanoAODv5
Returns
-------
out: NanoAOD
Schema assuming NanoAODv5
"""
return cls(version="5")
def __call__(self, array: awkward.Array) -> awkward.Array:
fields = set(array.fields)
def _get_collection_fields(name, collection):
return set(filter(lambda f: f.startswith(name), collection))
# branches that start with "n"
counter_fields = _get_collection_fields("n", fields)
# parse into high-level records (collections, list collections, and singletons)
collections = {k.split("_", maxsplit=1)[0] for k in fields - counter_fields}
# handles collections with underscore in their names
def _special_collections(collections, fields):
additional_collections = []
collections_to_remove = set()
for name in collections:
# check collections that have only one instance in each event
if "n" + name not in fields:
collection_counts = _get_collection_fields("n" + name + "_", fields)
# if however, the fields of this collection have multiple instances in each event
# then we use these fields as collections instead
# Example: 'nProton_multiRP' and 'nProton_singleRP' fields are present but no 'nProton' field
# Then we add new 'Proton_multiRP' and 'Proton_singleRP' collections and delete 'Proton' collection
if len(collection_counts) > 0:
additional_collections += [
counts.removeprefix("n")
for counts in collection_counts
if counts.count("_") == 1
]
collections_to_remove.add(name)
collections.update(additional_collections)
collections -= collections_to_remove
return collections
collections = _special_collections(collections, fields)
# check if data or simulation
is_data = "GenPart" not in collections
new_fields = {}
# # Create offsets virtual arrays
# for name in counter_fields:
# arr = _non_materializing_get_field(array, name)
# new_fields[name.replace("n", "o", 1)] = counts2offsets(arr)
# Check the presence of the event_ids
missing_event_ids = [
event_id for event_id in self.event_ids if event_id not in fields
]
if len(missing_event_ids) > 0:
if self.error_missing_event_ids:
msg = f"There are missing event ID fields: {missing_event_ids} \n\n\
The event ID fields {self.event_ids} are necessary to perform sub-run identification \
(e.g. for corrections and sub-dividing data during different detector conditions),\
to cross-validate MC and Data (i.e. matching events for comparison), and to generate event displays. \
It's advised to never drop these branches from the dataformat.\n\n\
This error can be demoted to a warning by setting the class level variable error_missing_event_ids to False."
raise RuntimeError(msg)
warnings.warn(
f"Missing event_ids : {missing_event_ids}",
RuntimeWarning,
stacklevel=2,
)
# Create global index virtual arrays for indirection
for indexer, target in self.all_cross_references.items():
if target.startswith("Gen") and is_data:
continue
if indexer not in fields:
if self.warn_missing_crossrefs:
warnings.warn(
f"Missing cross-reference index for {indexer} => {target}",
RuntimeWarning,
stacklevel=2,
)
continue
if "n" + target not in fields:
if self.warn_missing_crossrefs:
warnings.warn(
f"Missing cross-reference target for {indexer} => {target}",
RuntimeWarning,
stacklevel=2,
)
continue
# convert nWhatever to a global index
# this used to be transforms.counts2offsets_form + transforms.local2global_form in coffea
arr_indexer = _non_materializing_get_field(array, indexer)
arr_target = _non_materializing_get_field(array, "n" + target)
new_fields[indexer + "G"] = local2globalindex(arr_indexer, arr_target)
# Create nested indexer from Idx1, Idx2, ... arrays
for name, indexers in self.nested_items.items():
if all(idx in new_fields for idx in indexers):
new_fields[name] = nestedindex(
[_non_materializing_get_field(new_fields, idx) for idx in indexers]
)
# Create nested indexer from n* counts arrays
for name, (local_counts, target) in self.nested_index_items.items():
if local_counts in fields and "n" + target in fields:
arr_local_counts = _non_materializing_get_field(array, local_counts)
arr_target = _non_materializing_get_field(array, "n" + target)
# this used to be transforms.counts2nestedindex_form + transforms.local2global_form in coffea
new_fields[name] = counts2nestedindex(arr_local_counts, arr_target)
# Create any special arrays
for name, (fcn, args) in self.special_items.items():
if all((k in new_fields or k in fields) for k in args):
# shortened code: (_non_materializing_get_field(new_fields if k in new_fields else array, k) for k in args)
input_arrays = ()
for k in args:
input_arrays += (
_non_materializing_get_field(
new_fields if k in new_fields else array, k
),
)
new_fields[name] = fcn(*input_arrays)
output = {}
for name in collections:
name_with_underscore = name + "_"
mixin = self.mixins.get(name, "NanoCollection")
if "n" + name in fields and name not in fields:
content = {}
# buffers in `array`
for field in _get_collection_fields(name_with_underscore, fields):
arr = _non_materializing_get_field(array, field)
*_, buffers = awkward.to_buffers(arr)
assert {"node0-offsets", "node1-data"} == set(buffers)
# take flat data
content[field.removeprefix(name_with_underscore)] = (
awkward.contents.NumpyArray(
buffers["node1-data"],
parameters=arr.layout.parameters,
)
)
# new buffers in `new_fields`
for field in _get_collection_fields(name_with_underscore, new_fields):
arr = _non_materializing_get_field(new_fields, field)
parameters = arr.layout.parameters
*_, buffers = awkward.to_buffers(arr)
if field in self.nested_items | self.nested_index_items | dict(
list(self.special_items.items())[1:]
):
# doubly-jagged case
assert {"node0-offsets", "node1-offsets", "node2-data"} == set(
buffers
)
# take singly jagged array
content[field.removeprefix(name_with_underscore)] = (
awkward.contents.ListOffsetArray(
offsets=awkward.index.Index(buffers["node1-offsets"]),
content=awkward.contents.NumpyArray(
buffers["node2-data"]
),
parameters=parameters,
)
)
else:
assert {"node0-offsets", "node1-data"} == set(buffers)
# take flat data
content[field.removeprefix(name_with_underscore)] = (
awkward.contents.NumpyArray(
buffers["node1-data"], parameters=parameters
)
)
_content = (*content.values(),)
_fields = (*content.keys(),)
_length = _check_equal_lengths(_content)
# combine contents in a RecordArray
record = awkward.contents.RecordArray(
_content, _fields, length=_length, parameters={}
)
# update parameters
counts = _non_materializing_get_field(array, "n" + name)
record.parameters.update(
{
"collection_name": name,
"__record__": mixin,
"__doc__": counts.layout.parameters.get("__doc__"),
}
)
# wrap as jagged array
offsets = counts2offsets(counts)
offsets = awkward.index.Index(offsets)
output[name] = awkward.contents.ListOffsetArray(
offsets=offsets, content=record
)
elif ("n" + name) in fields:
# list singleton (can use branch's own offsets)
arr = _non_materializing_get_field(array, name)
output[name] = awkward.to_layout(arr)
output[name].parameters.update(
{"__array__": mixin, "collection_name": name}
)
elif name in fields:
# singleton
arr = _non_materializing_get_field(array, name)
output[name] = awkward.to_layout(arr)
else:
# simple collection
content = {}
for field in _get_collection_fields(name_with_underscore, fields):
arr = _non_materializing_get_field(array, field)
*_, buffers = awkward.to_buffers(arr)
assert {"node0-data"} == set(buffers)
# take flat data
content[field.removeprefix(name_with_underscore)] = (
awkward.contents.NumpyArray(
buffers["node0-data"],
# forward parameters
parameters=arr.layout.parameters,
)
)
_content = (*content.values(),)
_fields = (*content.keys(),)
_length = _check_equal_lengths(_content)
output[name] = awkward.contents.RecordArray(
_content, _fields, length=_length, parameters={}
)
# update parameters
output[name].parameters.update(
{
"collection_name": name,
"__record__": mixin,
}
)
# final nanoevents (most outer) zip
_content = (*output.values(),)
_fields = (*output.keys(),)
_length = awkward.num(array, axis=0)
nanoevents = awkward.Array(
awkward.contents.RecordArray(_content, _fields, length=_length),
behavior=self.behavior(),
)
# fix virtual array shape generators by re-running from buffers:
nanoevents = awkward.with_name(
_rewrap(nanoevents),
name="NanoEvents",
)
# add aditional parameters
nanoevents.layout.parameters.update(
{
"metadata": {"version": self._version},
**array.layout.parameters,
}
)
# add ref to itself in attrs
nanoevents.attrs["@original_array"] = nanoevents
return nanoevents
[docs]
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema (dict)"""
from awkward_zipper.behaviors import nanoaod
return nanoaod.behavior
[docs]
class PFNanoAOD(NanoAOD):
"""PFNano schema builder
PFNano is an extended NanoAOD format that includes PF candidates and secondary vertices
More info at https://github.com/cms-jet/PFNano
"""
mixins: tp.ClassVar = {
**NanoAOD.mixins,
"JetSVs": "AssociatedSV",
"FatJetSVs": "AssociatedSV",
"GenJetSVs": "AssociatedSV",
"GenFatJetSVs": "AssociatedSV",
"JetPFCands": "AssociatedPFCand",
"FatJetPFCands": "AssociatedPFCand",
"GenJetCands": "AssociatedPFCand",
"GenFatJetCands": "AssociatedPFCand",
"PFCands": "PFCand",
"GenCands": "PFCand",
}
all_cross_references: tp.ClassVar = {
**NanoAOD.all_cross_references,
"FatJetPFCands_jetIdx": "FatJet", # breaks pattern
"FatJetPFCands_pFCandsIdx": "PFCands",
"FatJetSVs_jetIdx": "FatJet", # breaks pattern
"FatJetSVs_sVIdx": "SV",
"FatJet_electronIdx3SJ": "Electron",
"FatJet_muonIdx3SJ": "Muon",
"GenFatJetCands_jetIdx": "GenJetAK8", # breaks pattern
"GenFatJetCands_pFCandsIdx": "GenCands", # breaks pattern
"GenFatJetSVs_jetIdx": "GenJetAK8", # breaks pattern
"GenFatJetSVs_sVIdx": "SV",
"GenJetCands_jetIdx": "GenJet", # breaks pattern
"GenJetCands_pFCandsIdx": "GenCands", # breaks pattern
"GenJetSVs_jetIdx": "GenJet", # breaks pattern
"GenJetSVs_sVIdx": "SV",
"JetPFCands_jetIdx": "Jet",
"JetPFCands_pFCandsIdx": "PFCands",
"JetSVs_jetIdx": "Jet",
"JetSVs_sVIdx": "SV",
"SubJet_subGenJetAK8Idx": "SubGenJetAK8",
}
[docs]
class ScoutingNanoAOD(NanoAOD):
"""ScoutingNano schema builder
ScoutingNano is a NanoAOD format that includes Scouting objects
"""
mixins: tp.ClassVar = {
**NanoAOD.mixins,
"ScoutingJet": "Jet",
"ScoutingFatJet": "FatJet",
"ScoutingMET": "MissingET",
"ScoutingMuonNoVtxDisplacedVertex": "Vertex",
"ScoutingMuonVtxDisplacedVertex": "Vertex",
"ScoutingPrimaryVertex": "Vertex",
"ScoutingElectron": "Electron",
"ScoutingPhoton": "Photon",
"ScoutingMuonNoVtx": "Muon",
"ScoutingMuonVtx": "Muon",
}
all_cross_references: tp.ClassVar = {**NanoAOD.all_cross_references}