Workflow Example: Higgs to ZZ with each Z going to two muons

The workflow reads a file, gets the Events then finds the generator information, and if the generator information has a higgs (a pdg_Id of 25) they look for groups of 2 or 4 muons (which is what the higgs can decay into).

ROOT version

Graphical Version

This incorporates the idea of streaming so each 'square' will receive an input the moment the 'upstream' system has it. The filter's left hand input is passed out the bottom only if the right hand side input says 'pass' or 'fail' for each item. The filter will only fetch on the left hand side once the right hand side has made a decision. The 'Return Last Value' will wait to send its value once its input has sent an 'end of list' message. In that way, the right hand histogram fills only if the 'muon list' has only 4 items in it. Workflow for 2 and 4 mu higgs from MC.png

Declarative

file("pythiaHZZ4mu.root")
     .do(_.get("Events")
                .do(_.particles.assign( _.get("edmHepMCProduct_source__PROD.obj") )
                .select( _.particles.select(_.pdg_id() == 25 ) )
                .do(
            _.muons.assign( _.particles.select(  (abs(_.pdg_id()) == 13)._and_( _.status()==1 )) ),
            combine(_.muons.select(_.charge()>0),
                                      _.muons.select(_.charge()<0) )
                      .do( histogram("Hist2muMass", "test 2-mu mass", 100,  60., 120.)
                                                              .fill( (_[0].momentum() + _[1].momentum() ).m() )
                                    ),
                   _.muons.select( _.size() == 4 )
                                     .do(
                          histogram("Hist4muMass", "test 4-mu mass", 100, 170., 210.)
                  .fill(_.sum( _.momentum() ).m() )
                                  )
                )
      )

The '_.' is a stand in for the object defined at the 'previous' scope. So
   file("....").do(_.get("Events")
the '_.get(....)' is a python 'get' call to the file object. This code doesn't actually execute, instead it forms an expression tree which can then be optimized and finally executed.

Streaming Execution Engine

The following is a graphical representation of the workflow built using the a prototype version of a streaming execution engine.
execution engine test.png

The actual code is below
import engine as e
import roottools as rt
import ROOT
f = ROOT.TFile("pythiaHZZ4mu.root")
events = rt.EventTree(f.Get("Events"))
br = events.branch("edmHepMCProduct_source__PROD.obj")

#used with Accumulator to fill a plot
class Plotter(object):
    def __init__(self,histogram):
        self.__hist = histogram
    def reset(self):
        self.__hist.Reset()
    def accumulate(self,x):
        self.__hist.Fill(x)
        pass
    def value(self):
        self.__hist.Draw()
        return self.__hist

#used with accumulator to saved streamed info to a list
class Saver(object):
    def __init__(self):
        self.__values = []
    def reset(self):
        self.__values = []
        pass
    def accumulate(self,x):
        self.__values.append(x)
        pass
    def value(self):
        return self.__values

#used with accumulator to count items being streamed
class Counter(object):
    def __init__(self):
        self.__count = 0
    def reset(self):
        self.__count = 0
        pass
    def accumulate(self,x):
        self.__count += 1
        pass
    def value(self):
        return self.__count

#setup the root histograms
hpt = ROOT.TH1D( "hpt", "pt", 100,  0., 500. )
hzmass = ROOT.TH1D( "Hist2muMass", "test 2-mu mass", 100,  60., 120. )
hhiggsmass = ROOT.TH1D( "Hist4muMass", "test 4-mu mass", 100, 170., 210. ) ;

source = e.WrapperActiveSource(events)

mcevent = e.StreamingWrapperModule(lambda x: br().GetEvent())
source.connect(mcevent)

particle = e.GeneratorWrapperModule(lambda x: rt.range(x.particles_begin(),x.particles_end()))
mcevent.connect(particle)

#find muons
muon = e.FilteringWrapperModule(lambda x: abs(x.pdg_id()) == 13 and x.status() == 1)
particle.connect(muon)

mcHiggs = e.FilteringWrapperModule(lambda x: x.pdg_id() == 25)
particle.connect(mcHiggs)

numMCHiggs = e.AccumulatorWrapperModule(Counter())
mcHiggs.connect(numMCHiggs)


mu_plus = e.FilteringWrapperModule(lambda x:x.pdg_id()>0)
muon.connect(mu_plus)

mu_minus = e.FilteringWrapperModule(lambda x:x.pdg_id()<0)
muon.connect(mu_minus)

mu_plus_mom = e.StreamingWrapperModule(lambda x: x.momentum())
mu_plus.connect(mu_plus_mom)

mu_minus_mom = e.StreamingWrapperModule(lambda x: x.momentum())
mu_minus.connect(mu_minus_mom)

#save the mu+ and mu- since need entire lists when doing combinatorics
mu_pluses = e.AccumulatorWrapperModule(Saver())
mu_plus_mom.connect(mu_pluses)

mu_minuses = e.AccumulatorWrapperModule(Saver())
mu_minus_mom.connect(mu_minuses)

#now do not process any events without Higgs
haveMCHiggs = e.ConditionalModule()
numMCHiggs.connect(haveMCHiggs.if_)
mu_pluses.connect(haveMCHiggs.mu_pluses)
mu_minuses.connect(haveMCHiggs.mu_minuses)

zs = e.BinaryFunctionWrapperModule(lambda plus,minus: [(x+y) for x in plus for y in minus])
haveMCHiggs.mu_pluses().connect(zs.left)
haveMCHiggs.mu_minuses().connect(zs.right)

z = e.GeneratorWrapperModule(lambda x:x)
zs.connect(z)

z_mass = e.StreamingWrapperModule(lambda x: x.m())
z.connect(z_mass)

z_mass_plot = e.AccumulatorWrapperModule(Plotter(hzmass),2)
z_mass.connect(z_mass_plot)

all_muons=e.BinaryFunctionWrapperModule(lambda x,y: x+y)
haveMCHiggs.mu_pluses().connect(all_muons.left)
haveMCHiggs.mu_minuses().connect(all_muons.right)

only4muons=e.StreamingWrapperModule(lambda x: len(x)==4)
all_muons.connect(only4muons)

#only go on if have 4 muons total
ifOnlyHave4Muons=e.ConditionalModule()
only4muons.connect(ifOnlyHave4Muons.if_)
all_muons.connect(ifOnlyHave4Muons.all_muons)

higgs = e.StreamingWrapperModule(lambda x: sum(x,ROOT.CLHEP.HepLorentzVector()))
ifOnlyHave4Muons.all_muons().connect(higgs)

higgs_mass = e.StreamingWrapperModule(lambda x:x.m())
higgs.connect(higgs_mass)

higgs_mass_plot = e.AccumulatorWrapperModule(Plotter(hhiggsmass),2)
higgs_mass.connect(higgs_mass_plot)

source.do()

using only a few primitives (WrapperActiveSource, StreamingWrapperModule, GeneratorWrapperModule, FilteringWrapperModule, AccumulatorWrapperModule, ConditionalModule, and BinaryFunctionWrapperModule) plus lambdas to denote the actual operations worked well. From this exercise I learned

  1. there is a need for both streamed objects and containers of objects to be moved between modules
  2. conditional execution (e.g., 'if') can be handled by blocking signals (the ConditionModule was actually written during this exercise to make the notation easier)
  3. the profile shows that most of the time is spend in the signalslot system since it gets called the most. Rewritting it in C++ could be a substantial speed improvement

NOTE: I rewrote the internals of the code so that the various modules actually wrote python code rather than do the work via the signalslot system. This means when the code is executing the analysis no overhead is accrude because of infrastructure. This implementation allowed the code to run faster than my hand written PyROOT versions [see AEExampleWorkflowsProfiled].

The automatically written code looks like
accumulate3.reset()
accumulate4.reset()
for item in source:
  accumulate.reset()
  accumulate1.reset()
  accumulate2.reset()
  streamedItem= streamer(item)
  iterator= generator(streamedItem)
  for generatedItem in iterator :
    filteredItem= filter(generatedItem)
    if filteredItem :
      filteredItem1= filter1(generatedItem)
      if filteredItem1 :
        streamedItem1= streamer1(generatedItem)
        accumulateMethod(streamedItem1)
      filteredItem2= filter2(generatedItem)
      if filteredItem2 :
        streamedItem2= streamer2(generatedItem)
        accumulateMethod1(streamedItem2)
    filteredItem3= filter3(generatedItem)
    if filteredItem3 :
      accumulateMethod2(generatedItem)
  accumulated = accumulate.value()
  accumulated1 = accumulate1.value()
  accumulated2 = accumulate2.value()
  if bool(accumulated2) :
    value = binaryFunction(accumulated,accumulated1)
    iterator1= generator1(value)
    for generatedItem1 in iterator1 :
      streamedItem3= streamer3(generatedItem1)
      accumulateMethod3(streamedItem3)
    value1 = binaryFunction1(accumulated,accumulated1)
    streamedItem4= streamer4(value1)
    if bool(streamedItem4) :
      streamedItem5= streamer5(value1)
      streamedItem6= streamer6(streamedItem5)
      accumulateMethod4(streamedItem6)
accumulated3 = accumulate3.value()
accumulated4 = accumulate4.value()

The variables source, accumulate*, streamer*, generator*, binaryFunction* are passed to the execution code via the python global dictionary and are merely the code which the various *WrapperModules are wrapping.

-- ChrisDJones - 11 Aug 2006

Topic attachments
I Attachment Action Size Date Who Comment
Workflow_for_2_and_4_mu_higgs_from_MC.pngpng Workflow_for_2_and_4_mu_higgs_from_MC.png manage 116 K 11 Aug 2006 - 14:45 UnknownUser  
execution_engine_test.pngpng execution_engine_test.png manage 89 K 10 Oct 2006 - 17:10 UnknownUser diagram of execution engine workflow
This topic: HEP/SWIG > TWikiGroups > AnalysisEnvironmentGroup > AEExampleWorkflows
Topic revision: 06 Nov 2006, ChrisDJones
This site is powered by FoswikiCopyright © by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding CLASSE Wiki? Send feedback