Workflow Example: Higgs to ZZ with each Z going to two muons
The workflow reads a file, gets the Events then finds the generator information, and if the generator information has a higgs (a pdg_Id of 25) they look for groups of 2 or 4 muons (which is what the higgs can decay into).
ROOT version
Graphical Version
This incorporates the idea of streaming so each 'square' will receive an input the moment the 'upstream' system has it. The filter's left hand input is passed out the bottom only if the right hand side input says 'pass' or 'fail' for each item. The filter will only fetch on the left hand side once the right hand side has made a decision. The 'Return Last Value' will wait to send its value once its input has sent an 'end of list' message. In that way, the right hand histogram fills only if the 'muon list' has only 4 items in it.
Declarative
file("pythiaHZZ4mu.root")
.do(_.get("Events")
.do(_.particles.assign( _.get("edmHepMCProduct_source__PROD.obj") )
.select( _.particles.select(_.pdg_id() == 25 ) )
.do(
_.muons.assign( _.particles.select( (abs(_.pdg_id()) == 13)._and_( _.status()==1 )) ),
combine(_.muons.select(_.charge()>0),
_.muons.select(_.charge()<0) )
.do( histogram("Hist2muMass", "test 2-mu mass", 100, 60., 120.)
.fill( (_[0].momentum() + _[1].momentum() ).m() )
),
_.muons.select( _.size() == 4 )
.do(
histogram("Hist4muMass", "test 4-mu mass", 100, 170., 210.)
.fill(_.sum( _.momentum() ).m() )
)
)
)
The '_.' is a stand in for the object defined at the 'previous' scope. So
file("....").do(_.get("Events")
the '_.get(....)' is a python 'get' call to the file object. This code doesn't actually execute, instead it forms an expression tree which can then be optimized and finally executed.
Streaming Execution Engine
The following is a graphical representation of the workflow built using the a prototype version of a streaming execution engine.
The actual code is below
import engine as e
import roottools as rt
import ROOT
f = ROOT.TFile("pythiaHZZ4mu.root")
events = rt.EventTree(f.Get("Events"))
br = events.branch("edmHepMCProduct_source__PROD.obj")
#used with Accumulator to fill a plot
class Plotter(object):
def __init__(self,histogram):
self.__hist = histogram
def reset(self):
self.__hist.Reset()
def accumulate(self,x):
self.__hist.Fill(x)
pass
def value(self):
self.__hist.Draw()
return self.__hist
#used with accumulator to saved streamed info to a list
class Saver(object):
def __init__(self):
self.__values = []
def reset(self):
self.__values = []
pass
def accumulate(self,x):
self.__values.append(x)
pass
def value(self):
return self.__values
#used with accumulator to count items being streamed
class Counter(object):
def __init__(self):
self.__count = 0
def reset(self):
self.__count = 0
pass
def accumulate(self,x):
self.__count += 1
pass
def value(self):
return self.__count
#setup the root histograms
hpt = ROOT.TH1D( "hpt", "pt", 100, 0., 500. )
hzmass = ROOT.TH1D( "Hist2muMass", "test 2-mu mass", 100, 60., 120. )
hhiggsmass = ROOT.TH1D( "Hist4muMass", "test 4-mu mass", 100, 170., 210. ) ;
source = e.WrapperActiveSource(events)
mcevent = e.StreamingWrapperModule(lambda x: br().GetEvent())
source.connect(mcevent)
particle = e.GeneratorWrapperModule(lambda x: rt.range(x.particles_begin(),x.particles_end()))
mcevent.connect(particle)
#find muons
muon = e.FilteringWrapperModule(lambda x: abs(x.pdg_id()) == 13 and x.status() == 1)
particle.connect(muon)
mcHiggs = e.FilteringWrapperModule(lambda x: x.pdg_id() == 25)
particle.connect(mcHiggs)
numMCHiggs = e.AccumulatorWrapperModule(Counter())
mcHiggs.connect(numMCHiggs)
mu_plus = e.FilteringWrapperModule(lambda x:x.pdg_id()>0)
muon.connect(mu_plus)
mu_minus = e.FilteringWrapperModule(lambda x:x.pdg_id()<0)
muon.connect(mu_minus)
mu_plus_mom = e.StreamingWrapperModule(lambda x: x.momentum())
mu_plus.connect(mu_plus_mom)
mu_minus_mom = e.StreamingWrapperModule(lambda x: x.momentum())
mu_minus.connect(mu_minus_mom)
#save the mu+ and mu- since need entire lists when doing combinatorics
mu_pluses = e.AccumulatorWrapperModule(Saver())
mu_plus_mom.connect(mu_pluses)
mu_minuses = e.AccumulatorWrapperModule(Saver())
mu_minus_mom.connect(mu_minuses)
#now do not process any events without Higgs
haveMCHiggs = e.ConditionalModule()
numMCHiggs.connect(haveMCHiggs.if_)
mu_pluses.connect(haveMCHiggs.mu_pluses)
mu_minuses.connect(haveMCHiggs.mu_minuses)
zs = e.BinaryFunctionWrapperModule(lambda plus,minus: [(x+y) for x in plus for y in minus])
haveMCHiggs.mu_pluses().connect(zs.left)
haveMCHiggs.mu_minuses().connect(zs.right)
z = e.GeneratorWrapperModule(lambda x:x)
zs.connect(z)
z_mass = e.StreamingWrapperModule(lambda x: x.m())
z.connect(z_mass)
z_mass_plot = e.AccumulatorWrapperModule(Plotter(hzmass),2)
z_mass.connect(z_mass_plot)
all_muons=e.BinaryFunctionWrapperModule(lambda x,y: x+y)
haveMCHiggs.mu_pluses().connect(all_muons.left)
haveMCHiggs.mu_minuses().connect(all_muons.right)
only4muons=e.StreamingWrapperModule(lambda x: len(x)==4)
all_muons.connect(only4muons)
#only go on if have 4 muons total
ifOnlyHave4Muons=e.ConditionalModule()
only4muons.connect(ifOnlyHave4Muons.if_)
all_muons.connect(ifOnlyHave4Muons.all_muons)
higgs = e.StreamingWrapperModule(lambda x: sum(x,ROOT.CLHEP.HepLorentzVector()))
ifOnlyHave4Muons.all_muons().connect(higgs)
higgs_mass = e.StreamingWrapperModule(lambda x:x.m())
higgs.connect(higgs_mass)
higgs_mass_plot = e.AccumulatorWrapperModule(Plotter(hhiggsmass),2)
higgs_mass.connect(higgs_mass_plot)
source.do()
using only a few primitives (WrapperActiveSource, StreamingWrapperModule, GeneratorWrapperModule, FilteringWrapperModule, AccumulatorWrapperModule, ConditionalModule, and BinaryFunctionWrapperModule) plus
lambdas to denote the actual operations worked well. From this exercise I learned
- there is a need for both streamed objects and containers of objects to be moved between modules
- conditional execution (e.g., 'if') can be handled by blocking signals (the ConditionModule was actually written during this exercise to make the notation easier)
- the profile shows that most of the time is spend in the signalslot system since it gets called the most. Rewritting it in C++ could be a substantial speed improvement
NOTE: I rewrote the internals of the code so that the various modules actually wrote python code rather than do the work via the signalslot system. This means when the code is executing the analysis no overhead is accrude because of infrastructure. This implementation allowed the code to run faster than my hand written
PyROOT versions [see
AEExampleWorkflowsProfiled].
The automatically written code looks like
accumulate3.reset()
accumulate4.reset()
for item in source:
accumulate.reset()
accumulate1.reset()
accumulate2.reset()
streamedItem= streamer(item)
iterator= generator(streamedItem)
for generatedItem in iterator :
filteredItem= filter(generatedItem)
if filteredItem :
filteredItem1= filter1(generatedItem)
if filteredItem1 :
streamedItem1= streamer1(generatedItem)
accumulateMethod(streamedItem1)
filteredItem2= filter2(generatedItem)
if filteredItem2 :
streamedItem2= streamer2(generatedItem)
accumulateMethod1(streamedItem2)
filteredItem3= filter3(generatedItem)
if filteredItem3 :
accumulateMethod2(generatedItem)
accumulated = accumulate.value()
accumulated1 = accumulate1.value()
accumulated2 = accumulate2.value()
if bool(accumulated2) :
value = binaryFunction(accumulated,accumulated1)
iterator1= generator1(value)
for generatedItem1 in iterator1 :
streamedItem3= streamer3(generatedItem1)
accumulateMethod3(streamedItem3)
value1 = binaryFunction1(accumulated,accumulated1)
streamedItem4= streamer4(value1)
if bool(streamedItem4) :
streamedItem5= streamer5(value1)
streamedItem6= streamer6(streamedItem5)
accumulateMethod4(streamedItem6)
accumulated3 = accumulate3.value()
accumulated4 = accumulate4.value()
The variables
source
,
accumulate*
,
streamer*
,
generator*
,
binaryFunction*
are passed to the execution code via the python global dictionary and are merely the code which the various
*WrapperModules are wrapping.
--
ChrisDJones - 11 Aug 2006