Source code for awkward_zipper.kernels

import functools

import awkward
import numba
import numpy as np


[docs] def ensure_array(arraylike): """ Converts arraylike to a Numpy array """ if isinstance(arraylike, (awkward.contents.Content | awkward.Array)): return awkward.to_numpy(arraylike) if isinstance(arraylike, awkward.index.Index): return arraylike.data return np.asarray(arraylike)
# function: tp.Callable[[],]
[docs] def dispatch_wrap(function): @functools.wraps(function) def _wrapper(*input_arrays, data=None, dtype=np.int64): """ Calls a function and passes it input_arrays as parameters. Returns a function result in eager case. In virtual case returns a result from a function wrapped in a Virtual Array. Args: input_arrays: function parameters. Awkward arrays and integers are accepted. This is a VirtualArray generator limitation. data: additional parameter for VirtualArray creation function: function to return dtype: additional parameter for VirtualArray creation Returns: function(input_arrays) or awkward.VirtualArray(generator=lambda: function(input_arrays)) """ # Virtual array if data is not None: return awkward._nplikes.virtual.VirtualArray( nplike=data._nplike, shape=(awkward._nplikes.shape.unknown_length,), dtype=dtype, generator=lambda: function( *( ( array if isinstance(array, int) else awkward.materialize(array) ) for array in input_arrays ) ), shape_generator=None, ) # concrete array return function(*input_arrays) return _wrapper
[docs] def local2globalindex(index, counts): """ Convert a jagged local index to a global index This is the same as local2global(index, counts2offsets(counts)) where local2global and counts2offsets are as in coffea.nanoevents.transforms Example usage: Index array [[], [1], [], [2, 3]] Counts array [8, 7, 4, 7] Output array will be: [[], [9], [], [21, 22]] (here 21=8+7+4+2) """ @dispatch_wrap def _local2globalindex(index, counts): offsets = counts2offsets(counts) # make sure that offsets is always a Numpy array offsets = ensure_array(offsets) index = index.mask[index >= 0] + offsets[:-1] index = index.mask[index < offsets[1:]] # guard against out of bounds # workaround ValueError: can not (unsafe) zip ListOffsetArrays with non-NumpyArray contents # index.type is N * var * int32? index = awkward.fill_none(index, -1) output = awkward.flatten(index) # make sure that output is always Numpy array return ensure_array(output) # Check if VirtualArray index_data = None if not all(awkward.to_layout(_).is_all_materialized for _ in (index, counts)): index_data = index.layout.content.data # resulting global index will have the same offsets as local index index_offsets = index.layout.offsets index_content = _local2globalindex(index, counts, data=index_data) # index_content shape would be index_data.shape index_content = awkward.contents.numpyarray.NumpyArray(index_content) # create new parameters for the final array parameters = awkward.parameters(index) parameters["__doc__"] = "global " + parameters["__doc__"] return awkward.Array( awkward.contents.ListOffsetArray( offsets=index_offsets, content=index_content, parameters=parameters, ) )
[docs] def nestedindex(indices): """ Concatenate a list of indices along a new axis Outputs a jagged array with same outer shape as index arrays Example usage: First index array [[0, 2, 4], [8, 6]] Second index array [[1, 3, 5], [-1, 7]] Output [[[0, 1], [2, 3], [4, 5]], [[8, -1], [6, 7]]] """ @dispatch_wrap def _nestedindex_content(*indices): # return awkward.concatenate([idx[:, None] for idx in indexers], axis=1) flat_indices = [] for idx in indices: # flatten the index flat_indices.append(awkward.Array(idx.layout.content)) n = len(flat_indices) out = np.empty(n * len(flat_indices[0]), dtype="int64") for i, idx in enumerate(flat_indices): # index arrays should all be same shape flat arrays out[i::n] = idx return out @dispatch_wrap def _get_nested_index_offsets(nested_index_content, n_indices): return np.arange(0, len(nested_index_content) + 1, n_indices, dtype=np.int64) def _combine_parameters(indices): parameters = {} for idx in indices: if "__doc__" in parameters: parameters["__doc__"] += " and " else: parameters["__doc__"] = "nested from " parameters["__doc__"] += awkward.parameters(idx)["__doc__"] return parameters if not all( isinstance( awkward.to_layout(index), awkward.contents.listoffsetarray.ListOffsetArray ) for index in indices ): raise RuntimeError # Check if VirtualArray index_data = None if not all(awkward.to_layout(_).is_all_materialized for _ in indices): index_data = indices[0].layout.content.data # store offsets to later reapply them to the arrays offsets_stored = indices[0].layout.offsets nested_index_content = _nestedindex_content( *indices, data=index_data, dtype=np.int64 ) nested_index_content = awkward.contents.NumpyArray(nested_index_content) nested_index_offsets = _get_nested_index_offsets( nested_index_content, len(indices), data=index_data, dtype=np.int64 ) # combine offsets and content nested_index = awkward.contents.ListOffsetArray( awkward.index.Index64(nested_index_offsets), nested_index_content, ) # combine the parameters parameters = _combine_parameters(indices) # reapply the offsets return awkward.Array( awkward.contents.ListOffsetArray( offsets_stored, nested_index, parameters=parameters, ) )
[docs] def counts2nestedindex(local_counts, target_offsets): """Turn jagged local counts into doubly-jagged global index into a target Outputs a jagged array with same axis-0 shape as counts axis-1 Example usage: Local counts [[4, 3, 2], [4, 2]] Target offsets [9, 6] Target output [[[0, 1, 2, 3], [4, 5, 6], [7, 8]], [[9, 10, 11, 12], [13, 14]]] """ @dispatch_wrap def _arange(array): return np.arange(array[-1], dtype=np.int64) @dispatch_wrap def _flatten(array): return awkward.flatten(array) if not isinstance( local_counts.layout, awkward.contents.listoffsetarray.ListOffsetArray ): raise RuntimeError if not isinstance(target_offsets.layout, awkward.contents.numpyarray.NumpyArray): raise RuntimeError # count offsets the same way as with counts2offsets in coffea.nanoevents.transforms offsets = counts2offsets(target_offsets) # store offsets to later reapply them to the arrays offsets_stored = local_counts.layout.offsets # Check if VirtualArray local_counts_data = local_counts_data_dtype = None if not all( awkward.to_layout(_).is_all_materialized for _ in (local_counts, target_offsets) ): local_counts_data = local_counts.layout.content.data local_counts_data_dtype = local_counts_data.dtype nested_index_content = _arange(offsets, data=local_counts_data) flat_counts = _flatten( local_counts, data=local_counts_data, dtype=local_counts_data_dtype ) nested_index_offsets = counts2offsets(flat_counts) # combine offsets and content out = awkward.contents.ListOffsetArray( awkward.index.Index64(nested_index_offsets), awkward.contents.NumpyArray(nested_index_content), ) # reapply the offsets return awkward.Array( awkward.contents.ListOffsetArray( offsets_stored, out, ) )
# TODO: add dispatch_wrapper decorator for this function too
[docs] def counts2offsets(counts): # Cumulative sum of counts def _counts2offsets(counts): # make sure that input is always Numpy array counts = ensure_array(counts) # awkward index default type is int64, so we use the same type for new arrays offsets = np.empty(len(counts) + 1, dtype=np.int64) offsets[0] = 0 np.cumsum(counts, out=offsets[1:]) return offsets # VirtualArray # if isinstance(counts.layout.data, awkward._nplikes.virtual.VirtualArray): if ( isinstance(counts, awkward._nplikes.virtual.VirtualArray) and not counts.is_materialized ): virtual_array = counts elif isinstance(counts, awkward.Array) and not counts.layout.is_all_materialized: virtual_array = counts.layout.data else: virtual_array = None if virtual_array is not None: return awkward._nplikes.virtual.VirtualArray( nplike=virtual_array._nplike, shape=(awkward._nplikes.shape.unknown_length,), dtype=np.int64, generator=lambda: _counts2offsets(virtual_array.materialize()), shape_generator=None, ) # concrete array return _counts2offsets(counts.layout.data)
@dispatch_wrap @numba.njit def _distinct_parent_kernel(allpart_parent, allpart_pdg): out = np.empty(len(allpart_pdg), dtype=np.int64) for i in range(len(allpart_pdg)): parent = allpart_parent[i] if parent < 0: out[i] = -1 continue thispdg = allpart_pdg[i] while parent >= 0 and allpart_pdg[parent] == thispdg: if parent >= len(allpart_pdg): msg = "parent index beyond length of array!" raise RuntimeError(msg) parent = allpart_parent[parent] out[i] = parent return out
[docs] def distinct_parent(parents, pdg): """Compute first parent with distinct PDG id Signature: globalparents,globalpdgs,!distinctParent Expects global indexes, flat arrays, which should be same length """ if not isinstance(pdg.layout, awkward.contents.listoffsetarray.ListOffsetArray): raise RuntimeError if not isinstance(parents.layout, awkward.contents.listoffsetarray.ListOffsetArray): raise RuntimeError # Check if VirtualArray parents_data = None if not all(awkward.to_layout(_).is_all_materialized for _ in (parents, pdg)): parents_data = parents.layout.content.data # store offsets to later reapply them result_offsets = parents.layout.offsets # calculate the contents result_content = _distinct_parent_kernel( awkward.Array(parents.layout.content), awkward.Array(pdg.layout.content), data=parents_data, ) return awkward.Array( awkward.contents.ListOffsetArray( result_offsets, awkward.contents.NumpyArray(result_content), ) )
@dispatch_wrap @numba.njit def _children_kernel_content(offsets_in, parentidx): content1_out = np.empty(len(parentidx), dtype=np.int64) offset1 = 0 for record_index in range(len(offsets_in) - 1): start_src, stop_src = offsets_in[record_index], offsets_in[record_index + 1] for index in range(start_src, stop_src): for possible_child in range(index, stop_src): if parentidx[possible_child] == index: if offset1 >= len(content1_out): msg = "offset1 went out of bounds!" raise RuntimeError(msg) content1_out[offset1] = possible_child offset1 = offset1 + 1 return content1_out[:offset1] @dispatch_wrap @numba.njit def _children_kernel_offsets(offsets_in, parentidx, content1_out): offsets1_out = np.empty(len(parentidx) + 1, dtype=np.int64) # content1_out = np.empty(len(parentidx), dtype=np.int64) offsets1_out[0] = 0 offset0 = 1 offset1 = 0 for record_index in range(len(offsets_in) - 1): start_src, stop_src = offsets_in[record_index], offsets_in[record_index + 1] for index in range(start_src, stop_src): for possible_child in range(index, stop_src): if parentidx[possible_child] == index: if offset1 >= len(content1_out): msg = "offset1 went out of bounds!" raise RuntimeError(msg) # content1_out[offset1] = possible_child offset1 = offset1 + 1 if offset0 >= len(offsets1_out): msg = "offset0 went out of bounds!" raise RuntimeError(msg) offsets1_out[offset0] = offset1 offset0 = offset0 + 1 return offsets1_out
[docs] def children(counts, globalparents): """Compute children Signature: offsets,globalparents,!children Output will be a jagged array with same outer shape as globalparents content """ if not isinstance( globalparents.layout, awkward.contents.listoffsetarray.ListOffsetArray ): raise RuntimeError offsets = counts2offsets(counts) # Check if VirtualArray globalparents_data = None if not all( awkward.to_layout(_).is_all_materialized for _ in (counts, globalparents) ): globalparents_data = globalparents.layout.content.data # store offsets to later reapply them result_offsets = globalparents.layout.offsets # Numba can't accept Virtual arrays directly, so wrap them with awkward ccontent = _children_kernel_content( awkward.Array(awkward.contents.NumpyArray(offsets)), awkward.Array(globalparents.layout.content), data=globalparents_data, ) ccontent = awkward.contents.NumpyArray(ccontent) coffsets = _children_kernel_offsets( awkward.Array(awkward.contents.NumpyArray(offsets)), awkward.Array(globalparents.layout.content), awkward.Array(ccontent), data=globalparents_data, ) out = awkward.contents.ListOffsetArray( awkward.index.Index64(coffsets), ccontent, ) # reapply the offsets return awkward.Array( awkward.contents.ListOffsetArray( result_offsets, out, ) )
@dispatch_wrap @numba.njit def _distinct_children_deep_kernel_content(offsets_in, global_parents, global_pdgs): # offsets_out = np.empty(len(global_parents) + 1, dtype=np.int64) content_out = np.empty(len(global_parents), dtype=np.int64) # offsets_out[0] = 0 # offset0 = 1 offset1 = 0 for record_index in range(len(offsets_in) - 1): start_src, stop_src = offsets_in[record_index], offsets_in[record_index + 1] for index in range(start_src, stop_src): this_pdg = global_pdgs[index] # only perform the deep lookup when this particle is not already part of a decay chain # otherwise, the same child indices would be repeated for every parent in the chain # which would require content_out to have a length that isa-priori unknown if ( global_parents[index] >= 0 and this_pdg != global_pdgs[global_parents[index]] ): # keep an index of parents with same pdg id parents = np.empty(stop_src - index, dtype=np.int64) parents[0] = index offset2 = 1 # keep an additional index with parents that have at least one child parents_with_children = np.empty(stop_src - index, dtype=np.int64) offset3 = 0 for possible_child in range(index, stop_src): possible_parent = global_parents[possible_child] possibe_child_pdg = global_pdgs[possible_child] # compare with seen parents for parent_index in range(offset2): # check if we found a new child if parents[parent_index] == possible_parent: # first, remember that the parent has at least one child if offset3 >= len(parents_with_children): msg = "offset3 went out of bounds!" raise RuntimeError(msg) parents_with_children[offset3] = possible_parent offset3 = offset3 + 1 # then, depending on the pdg id, add to parents or content if possibe_child_pdg == this_pdg: # has the same pdg id, add to parents if offset2 >= len(parents): msg = "offset2 went out of bounds!" raise RuntimeError(msg) parents[offset2] = possible_child offset2 = offset2 + 1 else: # has a different pdg id, add to content if offset1 >= len(content_out): msg = "offset1 went out of bounds!" raise RuntimeError(msg) content_out[offset1] = possible_child offset1 = offset1 + 1 break # add parents with same pdg id that have no children for parent_index in range(1, offset2): possible_child = parents[parent_index] if possible_child not in parents_with_children[:offset3]: if offset1 >= len(content_out): msg = "offset1 went out of bounds! pt2" raise RuntimeError(msg) content_out[offset1] = possible_child offset1 = offset1 + 1 return content_out[:offset1] @dispatch_wrap @numba.njit def _distinct_children_deep_kernel_offsets( offsets_in, global_parents, global_pdgs, content_out ): offsets_out = np.empty(len(global_parents) + 1, dtype=np.int64) offsets_out[0] = 0 offset0 = 1 offset1 = 0 for record_index in range(len(offsets_in) - 1): start_src, stop_src = offsets_in[record_index], offsets_in[record_index + 1] for index in range(start_src, stop_src): this_pdg = global_pdgs[index] # only perform the deep lookup when this particle is not already part of a decay chain # otherwise, the same child indices would be repeated for every parent in the chain # which would require content_out to have a length that isa-priori unknown if ( global_parents[index] >= 0 and this_pdg != global_pdgs[global_parents[index]] ): # keep an index of parents with same pdg id parents = np.empty(stop_src - index, dtype=np.int64) parents[0] = index offset2 = 1 # keep an additional index with parents that have at least one child parents_with_children = np.empty(stop_src - index, dtype=np.int64) offset3 = 0 for possible_child in range(index, stop_src): possible_parent = global_parents[possible_child] possibe_child_pdg = global_pdgs[possible_child] # compare with seen parents for parent_index in range(offset2): # check if we found a new child if parents[parent_index] == possible_parent: # first, remember that the parent has at least one child if offset3 >= len(parents_with_children): msg = "offset3 went out of bounds!" raise RuntimeError(msg) parents_with_children[offset3] = possible_parent offset3 = offset3 + 1 # then, depending on the pdg id, add to parents or content if possibe_child_pdg == this_pdg: # has the same pdg id, add to parents if offset2 >= len(parents): msg = "offset2 went out of bounds!" raise RuntimeError(msg) parents[offset2] = possible_child offset2 = offset2 + 1 else: # has a different pdg id, add to content if offset1 >= len(content_out): msg = "offset1 went out of bounds!" raise RuntimeError(msg) offset1 = offset1 + 1 break # add parents with same pdg id that have no children for parent_index in range(1, offset2): possible_child = parents[parent_index] if possible_child not in parents_with_children[:offset3]: if offset1 >= len(content_out): msg = "offset1 went out of bounds! pt2" raise RuntimeError(msg) offset1 = offset1 + 1 # finish this item by adding an offset if offset0 >= len(offsets_out): msg = "offset0 went out of bounds!" raise RuntimeError(msg) offsets_out[offset0] = offset1 offset0 = offset0 + 1 return offsets_out
[docs] def distinct_children_deep(counts, global_parents, global_pdgs): """Compute all distinct children, skipping children with same pdg id in between. Signature: offsets,global_parents,global_pdgs,!distinctChildrenDeep Expects global indexes, flat arrays, which should be same length """ if not isinstance( global_parents.layout, awkward.contents.listoffsetarray.ListOffsetArray ): raise RuntimeError if not isinstance( global_pdgs.layout, awkward.contents.listoffsetarray.ListOffsetArray ): raise RuntimeError offsets = counts2offsets(counts) # Check if VirtualArray global_parents_data = None if not all( awkward.to_layout(_).is_all_materialized for _ in (counts, global_parents, global_pdgs) ): global_parents_data = global_parents.layout.content.data # store offsets to later reapply them result_offsets = global_parents.layout.offsets ccontent = _distinct_children_deep_kernel_content( awkward.Array(awkward.contents.NumpyArray(offsets)), awkward.Array(global_parents.layout.content), awkward.Array(global_pdgs.layout.content), data=global_parents_data, ) ccontent = awkward.contents.NumpyArray(ccontent) coffsets = _distinct_children_deep_kernel_offsets( awkward.Array(awkward.contents.NumpyArray(offsets)), awkward.Array(global_parents.layout.content), awkward.Array(global_pdgs.layout.content), awkward.Array(ccontent), data=global_parents_data, ) out = awkward.contents.ListOffsetArray( awkward.index.Index64(coffsets), ccontent, ) # reapply the offsets return awkward.Array( awkward.contents.ListOffsetArray( result_offsets, out, ) )