CMS 3D CMS Logo

MassReplace.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import FWCore.ParameterSet.Config as cms
3 
5  """Visitor that travels within a cms.Sequence, looks for a parameter and replace its value
6  It will climb down within PSets, VPSets and VInputTags to find its target"""
7  def __init__(self,paramSearch,paramReplace,verbose=False,moduleLabelOnly=False,skipLabelTest=False):
8  self._paramSearch = self.standardizeInputTagFmt(paramSearch)
9  self._paramReplace = self.standardizeInputTagFmt(paramReplace)
10  self._moduleName = ''
11  self._verbose=verbose
12  self._moduleLabelOnly=moduleLabelOnly
13  self._skipLabelTest=skipLabelTest
14  def doIt(self,pset,base):
15  if isinstance(pset, cms._Parameterizable):
16  for name in pset.parameterNames_():
17  # if I use pset.parameters_().items() I get copies of the parameter values
18  # so I can't modify the nested pset
19  value = getattr(pset,name)
20  type = value.pythonTypeName()
21  if type == 'cms.PSet':
22  self.doIt(value,base+"."+name)
23  elif type == 'cms.VPSet':
24  for (i,ps) in enumerate(value): self.doIt(ps, "%s.%s[%d]"%(base,name,i) )
25  elif type == 'cms.VInputTag':
26  for (i,n) in enumerate(value):
27  # VInputTag can be declared as a list of strings, so ensure that n is formatted correctly
28  n = self.standardizeInputTagFmt(n)
29  if (n == self._paramSearch):
30  if self._verbose:print("Replace %s.%s[%d] %s ==> %s " % (base, name, i, self._paramSearch, self._paramReplace))
31  value[i] = self._paramReplace
32  elif self._moduleLabelOnly and n.moduleLabel == self._paramSearch.moduleLabel:
33  nrep = n; nrep.moduleLabel = self._paramReplace.moduleLabel
34  if self._verbose:print("Replace %s.%s[%d] %s ==> %s " % (base, name, i, n, nrep))
35  value[i] = nrep
36  elif type.endswith('.InputTag'):
37  if value == self._paramSearch:
38  if self._verbose:print("Replace %s.%s %s ==> %s " % (base, name, self._paramSearch, self._paramReplace))
39  from copy import deepcopy
40  if 'untracked' in type:
41  setattr(pset, name, cms.untracked.InputTag(self._paramReplace.getModuleLabel(),
42  self._paramReplace.getProductInstanceLabel(),
43  self._paramReplace.getProcessName()))
44  else:
45  setattr(pset, name, deepcopy(self._paramReplace) )
46  elif self._moduleLabelOnly and value.moduleLabel == self._paramSearch.moduleLabel:
47  from copy import deepcopy
48  repl = deepcopy(getattr(pset, name))
49  repl.moduleLabel = self._paramReplace.moduleLabel
50  setattr(pset, name, repl)
51  if self._verbose:print("Replace %s.%s %s ==> %s " % (base, name, value, repl))
52 
53 
54  @staticmethod
55  def standardizeInputTagFmt(inputTag):
56  ''' helper function to ensure that the InputTag is defined as cms.InputTag(str) and not as a plain str '''
57  if not isinstance(inputTag, cms.InputTag):
58  return cms.InputTag(inputTag)
59  return inputTag
60 
61  def enter(self,visitee):
62  label = ''
63  if (not self._skipLabelTest):
64  if hasattr(visitee,"hasLabel_") and visitee.hasLabel_():
65  label = visitee.label_()
66  else: label = '<Module not in a Process>'
67  else:
68  label = '<Module label not tested>'
69  self.doIt(visitee, label)
70  def leave(self,visitee):
71  pass
72 
73 def massSearchReplaceAnyInputTag(sequence, oldInputTag, newInputTag,verbose=False,moduleLabelOnly=False,skipLabelTest=False) :
74  """Replace InputTag oldInputTag with newInputTag, at any level of nesting within PSets, VPSets, VInputTags..."""
75  sequence.visit(MassSearchReplaceAnyInputTagVisitor(oldInputTag,newInputTag,verbose=verbose,moduleLabelOnly=moduleLabelOnly,skipLabelTest=skipLabelTest))
76 
77 def massReplaceInputTag(process,old="rawDataCollector",new="rawDataRepacker",verbose=False,moduleLabelOnly=False,skipLabelTest=False):
78  for s in process.paths_().keys():
79  massSearchReplaceAnyInputTag(getattr(process,s), old, new, verbose, moduleLabelOnly, skipLabelTest)
80  for s in process.endpaths_().keys():
81  massSearchReplaceAnyInputTag(getattr(process,s), old, new, verbose, moduleLabelOnly, skipLabelTest)
82  if process.schedule_() is not None:
83  for task in process.schedule_()._tasks:
84  massSearchReplaceAnyInputTag(task, old, new, verbose, moduleLabelOnly, skipLabelTest)
85  return(process)
86 
88  """Visitor that travels within a cms.Sequence, looks for a parameter and returns a list of modules that have it"""
89  def __init__(self,paramName,paramSearch):
90  self._paramName = paramName
91  self._paramSearch = paramSearch
92  self._modules = []
93  def enter(self,visitee):
94  if (hasattr(visitee,self._paramName)):
95  if getattr(visitee,self._paramName) == self._paramSearch:
96  self._modules.append(visitee)
97  def leave(self,visitee):
98  pass
99  def modules(self):
100  return self._modules
101 
103  """Visitor that travels within a cms.Sequence, looks for a parameter and replaces its value"""
104  def __init__(self,paramName,paramSearch,paramValue,verbose=False):
105  self._paramName = paramName
106  self._paramValue = paramValue
107  self._paramSearch = paramSearch
108  self._verbose = verbose
109  def enter(self,visitee):
110  if (hasattr(visitee,self._paramName)):
111  if getattr(visitee,self._paramName) == self._paramSearch:
112  if self._verbose:print("Replaced %s.%s: %s => %s" % (visitee,self._paramName,getattr(visitee,self._paramName),self._paramValue))
113  setattr(visitee,self._paramName,self._paramValue)
114  def leave(self,visitee):
115  pass
116 
117 def massSearchReplaceParam(sequence,paramName,paramOldValue,paramValue,verbose=False):
118  sequence.visit(MassSearchReplaceParamVisitor(paramName,paramOldValue,paramValue,verbose))
119 
120 def massReplaceParameter(process,name="label",old="rawDataCollector",new="rawDataRepacker",verbose=False):
121  for s in process.paths_().keys():
122  massSearchReplaceParam(getattr(process,s),name,old,new,verbose)
123  for s in process.endpaths_().keys():
124  massSearchReplaceParam(getattr(process,s),name,old,new,verbose)
125  if process.schedule_() is not None:
126  for task in process.schedule_()._tasks:
127  massSearchReplaceParam(task, name, old, new, verbose)
128  return(process)
129 
130 if __name__=="__main__":
131  import unittest
132  class TestModuleCommand(unittest.TestCase):
133 
135  p = cms.Process("test")
136  p.a = cms.EDProducer("a", src=cms.InputTag("gen"))
137  p.b = cms.EDProducer("ab", src=cms.InputTag("a"))
138  p.c = cms.EDProducer("ac", src=cms.InputTag("b"),
139  nested = cms.PSet(src = cms.InputTag("b"), src2 = cms.InputTag("c")),
140  nestedv = cms.VPSet(cms.PSet(src = cms.InputTag("b")), cms.PSet(src = cms.InputTag("d"))),
141  vec = cms.VInputTag(cms.InputTag("a"), cms.InputTag("b"), cms.InputTag("c"), cms.InputTag("d"))
142  )
143  p.s = cms.Sequence(p.a*p.b*p.c)
144  massSearchReplaceAnyInputTag(p.s, cms.InputTag("b"), cms.InputTag("new"))
145  self.assertNotEqual(cms.InputTag("new"), p.b.src)
146  self.assertEqual(cms.InputTag("new"), p.c.src)
147  self.assertEqual(cms.InputTag("new"), p.c.nested.src)
148  self.assertEqual(cms.InputTag("new"), p.c.nested.src)
149  self.assertNotEqual(cms.InputTag("new"), p.c.nested.src2)
150  self.assertEqual(cms.InputTag("new"), p.c.nestedv[0].src)
151  self.assertNotEqual(cms.InputTag("new"), p.c.nestedv[1].src)
152  self.assertNotEqual(cms.InputTag("new"), p.c.vec[0])
153  self.assertEqual(cms.InputTag("new"), p.c.vec[1])
154  self.assertNotEqual(cms.InputTag("new"), p.c.vec[2])
155  self.assertNotEqual(cms.InputTag("new"), p.c.vec[3])
156 
158  process1 = cms.Process("test")
159  massReplaceInputTag(process1, "a", "b", False, False, False)
160  self.assertEqual(process1.dumpPython(),
161 """import FWCore.ParameterSet.Config as cms
162 
163 process = cms.Process("test")
164 
165 """)
166  p = cms.Process("test")
167  p.a = cms.EDProducer("a", src=cms.InputTag("gen"))
168  p.b = cms.EDProducer("ab", src=cms.InputTag("a"))
169  p.c = cms.EDProducer("ac", src=cms.InputTag("b"),
170  nested = cms.PSet(src = cms.InputTag("a"), src2 = cms.InputTag("c")),
171  nestedv = cms.VPSet(cms.PSet(src = cms.InputTag("a")), cms.PSet(src = cms.InputTag("d"))),
172  vec = cms.VInputTag(cms.InputTag("a"), cms.InputTag("b"), cms.InputTag("c"), cms.InputTag("d"))
173  )
174  p.d = cms.EDProducer("ab", src=cms.InputTag("a"))
175  p.e = cms.EDProducer("ab", src=cms.InputTag("a"))
176  p.f = cms.EDProducer("ab", src=cms.InputTag("a"))
177  p.g = cms.EDProducer("ab", src=cms.InputTag("a"))
178  p.h = cms.EDProducer("ab", src=cms.InputTag("a"))
179  p.i = cms.EDProducer("ab", src=cms.InputTag("a"))
180  p.s1 = cms.Sequence(p.a*p.b*p.c)
181  p.path1 = cms.Path(p.s1)
182  p.s2 = cms.Sequence(p.d)
183  p.path2 = cms.Path(p.e)
184  p.s3 = cms.Sequence(p.f)
185  p.endpath1 = cms.EndPath(p.s3)
186  p.endpath2 = cms.EndPath(p.g)
187  p.t1 = cms.Task(p.h)
188  p.t2 = cms.Task(p.i)
189  p.schedule = cms.Schedule()
190  p.schedule.associate(p.t1, p.t2)
191  massReplaceInputTag(p, "a", "b", False, False, False)
192  self.assertEqual(cms.InputTag("b"), p.b.src)
193  self.assertEqual(cms.InputTag("b"), p.c.vec[0])
194  self.assertEqual(cms.InputTag("c"), p.c.vec[2])
195  self.assertEqual(cms.InputTag("a"), p.d.src)
196  self.assertEqual(cms.InputTag("b"), p.e.src)
197  self.assertEqual(cms.InputTag("b"), p.f.src)
198  self.assertEqual(cms.InputTag("b"), p.g.src)
199  self.assertEqual(cms.InputTag("b"), p.h.src)
200  self.assertEqual(cms.InputTag("b"), p.i.src)
201 
203  p = cms.Process("test")
204  p.a = cms.EDProducer("a", src=cms.InputTag("gen"))
205  p.b = cms.EDProducer("ab", src=cms.InputTag("a"))
206  p.c = cms.EDProducer("ac", src=cms.InputTag("b"),
207  nested = cms.PSet(src = cms.InputTag("c"))
208  )
209  p.s = cms.Sequence(p.a*p.b*p.c)
210  massSearchReplaceParam(p.s,"src",cms.InputTag("b"),"a")
211  self.assertEqual(cms.InputTag("a"),p.c.src)
212  self.assertNotEqual(cms.InputTag("a"),p.c.nested.src)
213 
215  process1 = cms.Process("test")
216  massReplaceParameter(process1, "src", cms.InputTag("a"), "b", False)
217  self.assertEqual(process1.dumpPython(),
218 """import FWCore.ParameterSet.Config as cms
219 
220 process = cms.Process("test")
221 
222 """)
223  p = cms.Process("test")
224  p.a = cms.EDProducer("a", src=cms.InputTag("gen"))
225  p.b = cms.EDProducer("ab", src=cms.InputTag("a"))
226  p.c = cms.EDProducer("ac", src=cms.InputTag("b"),
227  nested = cms.PSet(src = cms.InputTag("a"), src2 = cms.InputTag("c")),
228  nestedv = cms.VPSet(cms.PSet(src = cms.InputTag("a")), cms.PSet(src = cms.InputTag("d"))),
229  vec = cms.VInputTag(cms.InputTag("a"), cms.InputTag("b"), cms.InputTag("c"), cms.InputTag("d"))
230  )
231  p.d = cms.EDProducer("ab", src=cms.InputTag("a"))
232  p.e = cms.EDProducer("ab", src=cms.InputTag("a"))
233  p.f = cms.EDProducer("ab", src=cms.InputTag("a"))
234  p.g = cms.EDProducer("ab", src=cms.InputTag("a"))
235  p.h = cms.EDProducer("ab", src=cms.InputTag("a"))
236  p.i = cms.EDProducer("ab", src=cms.InputTag("a"))
237  p.s1 = cms.Sequence(p.a*p.b*p.c)
238  p.path1 = cms.Path(p.s1)
239  p.s2 = cms.Sequence(p.d)
240  p.path2 = cms.Path(p.e)
241  p.s3 = cms.Sequence(p.f)
242  p.endpath1 = cms.EndPath(p.s3)
243  p.endpath2 = cms.EndPath(p.g)
244  p.t1 = cms.Task(p.h)
245  p.t2 = cms.Task(p.i)
246  p.schedule = cms.Schedule()
247  p.schedule.associate(p.t1, p.t2)
248  massReplaceParameter(p, "src",cms.InputTag("a"), "b", False)
249  self.assertEqual(cms.InputTag("gen"), p.a.src)
250  self.assertEqual(cms.InputTag("b"), p.b.src)
251  self.assertEqual(cms.InputTag("a"), p.c.vec[0])
252  self.assertEqual(cms.InputTag("c"), p.c.vec[2])
253  self.assertEqual(cms.InputTag("a"), p.d.src)
254  self.assertEqual(cms.InputTag("b"), p.e.src)
255  self.assertEqual(cms.InputTag("b"), p.f.src)
256  self.assertEqual(cms.InputTag("b"), p.g.src)
257  self.assertEqual(cms.InputTag("b"), p.h.src)
258  self.assertEqual(cms.InputTag("b"), p.i.src)
259  unittest.main()
def __init__(self, paramName, paramSearch, paramValue, verbose=False)
Definition: MassReplace.py:104
def massSearchReplaceAnyInputTag(sequence, oldInputTag, newInputTag, verbose=False, moduleLabelOnly=False, skipLabelTest=False)
Definition: MassReplace.py:73
def massSearchReplaceParam(sequence, paramName, paramOldValue, paramValue, verbose=False)
Definition: MassReplace.py:117
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
return((rh^lh)&mask)
def testMassSearchReplaceAnyInputTag(self)
Definition: MassReplace.py:134
def massReplaceInputTag(process, old="rawDataCollector", new="rawDataRepacker", verbose=False, moduleLabelOnly=False, skipLabelTest=False)
Definition: MassReplace.py:77
def __init__(self, paramName, paramSearch)
Definition: MassReplace.py:89
def massReplaceParameter(process, name="label", old="rawDataCollector", new="rawDataRepacker", verbose=False)
Definition: MassReplace.py:120
def __init__(self, paramSearch, paramReplace, verbose=False, moduleLabelOnly=False, skipLabelTest=False)
Definition: MassReplace.py:7