CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Public Attributes | Private Attributes
DTWorkflow.DTWorkflow Class Reference
Inheritance diagram for DTWorkflow.DTWorkflow:
CLIHelper.CLIHelper CrabHelper.CrabHelper DTT0WireWorkflow.DTT0WireWorkflow DTTtrigWorkflow.DTttrigWorkflow DTVdriftWorkflow.DTvdriftWorkflow

Public Member Functions

def __init__ (self, options)
 
def add_local_calib_db (self, local=False)
 
def add_local_custom_db (self)
 
def add_local_t0_db (self, local=False)
 
def add_local_vdrift_db (self, local=False)
 
def add_preselection (self)
 
def add_raw_option (self)
 
def all (self)
 
def check (self)
 
def check_missing_options (self, requirements_dict)
 
def correction (self)
 
def dump (self)
 
def dump_options (self)
 
def get_config_name (self, command="")
 
def get_output_files (self, crabtask, output_path)
 
def load_options (self, config_file_path)
 
def load_options_command (self, command)
 
def local_path (self)
 
def outpath_workflow_mode_tag (self)
 
def prepare_common_dump (self, db_path)
 
def prepare_common_submit (self)
 
def prepare_common_write (self, do_hadd=True)
 
def prepare_workflow (self)
 
def pset_path (self)
 
def pset_template_base_bath (self)
 
def remote_out_path (self)
 
def result_path (self)
 
def run (self)
 
def runCMSSWtask (self, pset_path="")
 
def submit (self)
 
def tag (self)
 
def user (self)
 
def write (self)
 
def write_pset_file (self)
 
- Public Member Functions inherited from CLIHelper.CLIHelper
def add_parser_options (cls, parser)
 
def fill_required_options_dict (self)
 
def fill_required_options_prepare_dict (self)
 
def get_check_options_parser (cls)
 
def get_common_options_parser (cls)
 
def get_dump_options_parser (cls)
 
def get_input_db_options_parser (cls)
 
def get_local_input_db_options_parser (cls)
 
def get_submission_options_parser (cls)
 
def get_write_options_parser (cls)
 
- Public Member Functions inherited from CrabHelper.CrabHelper
def __init__ (self)
 
def cert_info (self)
 
def check_crabtask (self)
 
def crab (self)
 
def crab_config_filename (self)
 
def crab_config_filepath (self)
 
def crab_taskname (self)
 
def create_crab_config (self)
 
def fill_options_from_crab_config (self)
 
def submit_crab_task (self)
 
def voms_proxy_create (self, passphrase=None)
 
def voms_proxy_time_left (self)
 
def write_crabConfig (self)
 

Static Public Member Functions

def addPoolDBESSource (process, moduleName, record, tag, connect='sqlite_file:', label='')
 

Public Attributes

 digilabel
 
 files_reveived
 
 input_files
 
 options
 
 outpath_command_tag
 
 output_files
 
 process
 
 pset_name
 
 required_options_dict
 
 required_options_prepare_dict
 
 run_all_command
 
- Public Attributes inherited from CrabHelper.CrabHelper
 crab_config
 
 crabFunctions
 

Private Attributes

 _user
 

Detailed Description

This is the base class for all DTWorkflows and contains some
    common tasks 

Definition at line 18 of file DTWorkflow.py.

Constructor & Destructor Documentation

◆ __init__()

def DTWorkflow.DTWorkflow.__init__ (   self,
  options 
)

Definition at line 21 of file DTWorkflow.py.

21  def __init__(self, options):
22  self.options = options
23  super( DTWorkflow, self ).__init__()
24  self.digilabel = "muonDTDigis"
25  # dict to hold required variables. Can not be marked in argparse to allow
26  # loading of options from config
27  self.required_options_dict = {}
28  self.required_options_prepare_dict = {}
29  self.fill_required_options_dict()
30  self.fill_required_options_prepare_dict()
31  # These variables are determined in the derived classes
32  self.pset_name = ""
33  self.outpath_command_tag = ""
34  self.output_files = []
35  self.input_files = []
36 
37  self.run_all_command = False
38  self.files_reveived = False
39  self._user = ""
40  # change to working directory
41  os.chdir(self.options.working_dir)
42 
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)

Member Function Documentation

◆ add_local_calib_db()

def DTWorkflow.DTWorkflow.add_local_calib_db (   self,
  local = False 
)
Add a local calib database as input. Use the option local is used
    if the pset is processed locally and not with crab.

Definition at line 163 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.addPoolDBESSource(), mps_setup.append, DTWorkflow.DTWorkflow.input_files, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, DTWorkflow.DTWorkflow.process, and str.

Referenced by DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_submit(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_correction(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_submit().

163  def add_local_calib_db(self, local=False):
164  """ Add a local calib database as input. Use the option local is used
165  if the pset is processed locally and not with crab.
166  """
167  label = ''
168  if self.options.datasettype == "Cosmics":
169  label = 'cosmics'
170  if local:
171  connect = os.path.abspath(self.options.inputCalibDB)
172  else:
173  connect = os.path.basename(self.options.inputCalibDB)
174  self.addPoolDBESSource( process = self.process,
175  moduleName = 'calibDB',
176  record = 'DTTtrigRcd',
177  tag = 'ttrig',
178  connect = str("sqlite_file:%s" % connect),
179  label = label
180  )
181  self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
182 
#define str(s)

◆ add_local_custom_db()

def DTWorkflow.DTWorkflow.add_local_custom_db (   self)

Definition at line 183 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.addPoolDBESSource(), DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, and DTWorkflow.DTWorkflow.process.

Referenced by DTWorkflow.DTWorkflow.prepare_common_submit(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction().

183  def add_local_custom_db(self):
184  for option in ('inputDBRcd', 'connectStrDBTag'):
185  if hasattr(self.options, option) and not getattr(self.options, option):
186  raise ValueError("Option %s needed for custom input db" % option)
187  self.addPoolDBESSource( process = self.process,
188  record = self.options.inputDBRcd,
189  tag = self.options.inputDBTag,
190  connect = self.options.connectStrDBTag,
191  moduleName = 'customDB%s' % self.options.inputDBRcd
192  )
193 

◆ add_local_t0_db()

def DTWorkflow.DTWorkflow.add_local_t0_db (   self,
  local = False 
)
Add a local t0 database as input. Use the option local is used
    if the pset is processed locally and not with crab.

Definition at line 133 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.addPoolDBESSource(), mps_setup.append, DTWorkflow.DTWorkflow.input_files, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, and DTWorkflow.DTWorkflow.process.

Referenced by DTWorkflow.DTWorkflow.prepare_common_submit().

133  def add_local_t0_db(self, local=False):
134  """ Add a local t0 database as input. Use the option local is used
135  if the pset is processed locally and not with crab.
136  """
137  if local:
138  connect = os.path.abspath(self.options.inputT0DB)
139  else:
140  connect = os.path.basename(self.options.inputT0DB)
141  self.addPoolDBESSource( process = self.process,
142  moduleName = 't0DB',
143  record = 'DTT0Rcd',
144  tag = 't0',
145  connect = 'sqlite_file:%s' % connect)
146  self.input_files.append(os.path.abspath(self.options.inputT0DB))
147 

◆ add_local_vdrift_db()

def DTWorkflow.DTWorkflow.add_local_vdrift_db (   self,
  local = False 
)
Add a local vdrift database as input. Use the option local is used
    if the pset is processed locally and not with crab.

Definition at line 148 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.addPoolDBESSource(), mps_setup.append, DTWorkflow.DTWorkflow.input_files, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, and DTWorkflow.DTWorkflow.process.

Referenced by DTWorkflow.DTWorkflow.prepare_common_submit(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_write(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction(), and DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_write().

148  def add_local_vdrift_db(self, local=False):
149  """ Add a local vdrift database as input. Use the option local is used
150  if the pset is processed locally and not with crab.
151  """
152  if local:
153  connect = os.path.abspath(self.options.inputVDriftDB)
154  else:
155  connect = os.path.basename(self.options.inputVDriftDB)
156  self.addPoolDBESSource( process = self.process,
157  moduleName = 'vDriftDB',
158  record = 'DTMtimeRcd',
159  tag = 'vDrift',
160  connect = 'sqlite_file:%s' % connect)
161  self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
162 

◆ add_preselection()

def DTWorkflow.DTWorkflow.add_preselection (   self)
Add preselection to the process object stored in workflow_object

Definition at line 120 of file DTWorkflow.py.

References svgfig.load(), DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, tools.prependPaths(), DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, and DTWorkflow.DTWorkflow.process.

Referenced by DTWorkflow.DTWorkflow.prepare_common_submit().

120  def add_preselection(self):
121  """ Add preselection to the process object stored in workflow_object"""
122  if not hasattr(self, "process"):
123  raise NameError("Process is not initalized in workflow object")
124  pathsequence = self.options.preselection.split(':')[0]
125  seqname = self.options.preselection.split(':')[1]
126  self.process.load(pathsequence)
127  tools.prependPaths(self.process, seqname)
128 
def prependPaths(process, seqname)
Definition: tools.py:66
def load(fileName)
Definition: svgfig.py:547

◆ add_raw_option()

def DTWorkflow.DTWorkflow.add_raw_option (   self)

Definition at line 129 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.digilabel, tools.prependPaths(), DTT0WireWorkflow.DTT0WireWorkflow.process, DTVdriftWorkflow.DTvdriftWorkflow.process, DTTtrigWorkflow.DTttrigWorkflow.process, and DTWorkflow.DTWorkflow.process.

Referenced by DTWorkflow.DTWorkflow.prepare_common_submit().

129  def add_raw_option(self):
130  getattr(self.process, self.digilabel).inputLabel = 'rawDataCollector'
131  tools.prependPaths(self.process,self.digilabel)
132 
def prependPaths(process, seqname)
Definition: tools.py:66

◆ addPoolDBESSource()

def DTWorkflow.DTWorkflow.addPoolDBESSource (   process,
  moduleName,
  record,
  tag,
  connect = 'sqlite_file:',
  label = '' 
)
static

Definition at line 248 of file DTWorkflow.py.

References str.

Referenced by DTWorkflow.DTWorkflow.add_local_calib_db(), DTWorkflow.DTWorkflow.add_local_custom_db(), DTWorkflow.DTWorkflow.add_local_t0_db(), DTWorkflow.DTWorkflow.add_local_vdrift_db(), and DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_submit().

248  label='',):
249 
250  from CondCore.CondDB.CondDB_cfi import CondDB
251 
252  calibDB = cms.ESSource("PoolDBESSource",
253  CondDB,
254  timetype = cms.string('runnumber'),
255  toGet = cms.VPSet(cms.PSet(
256  record = cms.string(record),
257  tag = cms.string(tag),
258  label = cms.untracked.string(label)
259  )),
260  )
261  calibDB.connect = cms.string( str(connect) )
262  #if authPath: calibDB.DBParameters.authenticationPath = authPath
263  if 'oracle:' in connect:
264  calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
265  setattr(process,moduleName,calibDB)
266  setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
267  moduleName)
268  )
269 
#define str(s)

◆ all()

def DTWorkflow.DTWorkflow.all (   self)
generalized function to perform several workflow mode commands in chain.
    All commands mus be specified in self.all_commands list in workflow mode specific
    prepare function in child workflow objects.

Definition at line 94 of file DTWorkflow.py.

References DTT0WireWorkflow.DTT0WireWorkflow.all_commands, DTVdriftWorkflow.DTvdriftWorkflow.all_commands, DTTtrigWorkflow.DTttrigWorkflow.all_commands, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, splitter.FileObj.run, uploader.FileObj.run, uploadPayloads.FileObj.run, RPCRunIOV::RunIOV_Item.run, beamspot::BeamSpotContainer.run, TB06Reco.run, TB06RecoH2.run, RPCDQMObject.run, MultiEventFilter::Event.run, DTCalibrationWorker.DTCalibrationWorker.run(), L1Analysis::L1AnalysisEventDataFormat.run, TrackInfoProducerAlgorithm.run(), SurveyAlignmentAlgorithm.run(), EfficiencyPlotter.run, lumi::Lumi2DB::LumiSource.run, generateEDF.LumiInfo.run, WZInterestingEventSelector::event.run, MuonRecoTest.run, MatacqProducer::MatacqEventId.run, Point.run, BeamProfile2DBReader::TheBSfromDB.run, DTResolutionAnalysisTest.run, DTWorkflow.DTWorkflow.run(), HIPAlignmentAlgorithm.run(), options.ConnectionHLTMenu.run, BeamSpotRcdReader::theBSfromDB.run, DTDataIntegrityTest.run, BeamSpotOnlineRecordsReader::theBSOfromDB.run, DTBlockedROChannelsTest.run, CSCOverlapsAlignmentAlgorithm.run(), DTChamberEfficiencyTest.run, DTResolutionTest.run, ConstantStepOdeSolver.run(), DTEfficiencyTest.run, ME::Header.run, ApeSettingAlgorithm.run(), MuonAlignmentFromReference.run(), personalPlayback.Playback.run, DQMNet::CoreObject.run, MillePedeAlignmentAlgorithm.run(), DTLocalTriggerBaseTest.run, MatrixUtil.InputInfo.run, EcalDeadCellDeltaRFilter.run, QIE8Simulator.run(), HitEff.run, cond::RunInfo_t.run, AlignmentAlgorithmBase.run(), SiPixelErrorEstimation.run, personalPlayback.FrameworkJob.run, l1ct::Event.run, SiPixelTrackingRecHitsValid.run, dqm_interfaces.DirWalkerFile.run, JsonOutputProducer::JsonEvent.run, SummaryOutputProducer.run, SummaryOutputProducer::Event.run, and DTWorkflow.DTWorkflow.run_all_command.

94  def all(self):
95  """ generalized function to perform several workflow mode commands in chain.
96  All commands mus be specified in self.all_commands list in workflow mode specific
97  prepare function in child workflow objects.
98  """
99  self.run_all_command = True
100  for command in self.all_commands:
101  self.options.command = command
102  self.run()
103 
def all(container)
workaround iterator generators for ROOT classes
Definition: cmstools.py:25

◆ check()

def DTWorkflow.DTWorkflow.check (   self)
Function to check status of submitted tasks 

Definition at line 107 of file DTWorkflow.py.

References CrabHelper.CrabHelper.check_crabtask().

107  def check(self):
108  """ Function to check status of submitted tasks """
109  self.check_crabtask()
110 

◆ check_missing_options()

def DTWorkflow.DTWorkflow.check_missing_options (   self,
  requirements_dict 
)

Definition at line 43 of file DTWorkflow.py.

References join(), DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.run().

43  def check_missing_options(self, requirements_dict):
44  missing_options = []
45  # check if all required options exist
46  if self.options.command in requirements_dict:
47  for option in requirements_dict[self.options.command]:
48  if not (hasattr(self.options, option)
49  and ( (getattr(self.options,option))
50  or isinstance(getattr(self.options,option), bool) )):
51  missing_options.append(option)
52  if len(missing_options) > 0:
53  err = "The following CLI options are missing"
54  err += " for command %s: " % self.options.command
55  err += " ".join(missing_options)
56  raise ValueError(err)
57 
static std::string join(char **cmd)
Definition: RemoteFile.cc:19

◆ correction()

def DTWorkflow.DTWorkflow.correction (   self)

Definition at line 117 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.runCMSSWtask().

117  def correction(self):
118  self.runCMSSWtask()
119 

◆ dump()

def DTWorkflow.DTWorkflow.dump (   self)

Definition at line 114 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.runCMSSWtask().

114  def dump(self):
115  self.runCMSSWtask()
116 

◆ dump_options()

def DTWorkflow.DTWorkflow.dump_options (   self)

Definition at line 391 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.get_config_name(), DTWorkflow.DTWorkflow.local_path(), DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.run().

391  def dump_options(self):
392  with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
393  json.dump(vars(self.options), out_file, indent=4)
394 

◆ get_config_name()

def DTWorkflow.DTWorkflow.get_config_name (   self,
  command = "" 
)
Create the name for the output json file which will be dumped

Definition at line 385 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.dump_options(), and DTWorkflow.DTWorkflow.load_options_command().

385  def get_config_name(self, command= ""):
386  """ Create the name for the output json file which will be dumped"""
387  if not command:
388  command = self.options.command
389  return "config_" + command + ".json"
390 

◆ get_output_files()

def DTWorkflow.DTWorkflow.get_output_files (   self,
  crabtask,
  output_path 
)

Definition at line 270 of file DTWorkflow.py.

References CrabHelper.CrabHelper.crab().

Referenced by CrabHelper.CrabHelper.check_crabtask(), and DTWorkflow.DTWorkflow.prepare_common_write().

270  def get_output_files(self, crabtask, output_path):
271  self.crab.callCrabCommand( ["getoutput",
272  "--outputpath",
273  output_path,
274  crabtask.crabFolder ] )
275 

◆ load_options()

def DTWorkflow.DTWorkflow.load_options (   self,
  config_file_path 
)

Definition at line 395 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.load_options_command(), and DTWorkflow.DTWorkflow.run().

395  def load_options(self, config_file_path):
396  if not os.path.exists(config_file_path):
397  raise IOError("File %s not found" % config_file_path)
398  with open(config_file_path, "r") as input_file: config_json = json.load(input_file)
399  for key, val in config_json.items():
400  if not hasattr(self.options, key) or not getattr(self.options, key):
401  setattr(self.options, key, val)
402 
403 

◆ load_options_command()

def DTWorkflow.DTWorkflow.load_options_command (   self,
  command 
)
Load options for previous command in workflow 

Definition at line 404 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.get_config_name(), DTWorkflow.DTWorkflow.load_options(), DTWorkflow.DTWorkflow.local_path(), DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.prepare_common_write(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_check(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_dump(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_check(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_dump(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_check(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_dump(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_check(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_correction(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_dump(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_check().

404  def load_options_command(self, command ):
405  """Load options for previous command in workflow """
406  if not self.options.config_path:
407  if not self.options.run:
408  raise RuntimeError("Option run is required if no config path specified")
409  if not os.path.exists(self.local_path):
410  raise IOError("Local path %s does not exist" % self.local_path)
411  self.options.config_path = os.path.join(self.local_path,
412  self.get_config_name(command))
413  self.load_options( self.options.config_path )
414 
415 

◆ local_path()

def DTWorkflow.DTWorkflow.local_path (   self)
Output path on local machine 

Definition at line 334 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.outpath_command_tag, DTVdriftWorkflow.DTvdriftWorkflow.outpath_command_tag, DTTtrigWorkflow.DTttrigWorkflow.outpath_command_tag, DTWorkflow.DTWorkflow.outpath_command_tag, and DTWorkflow.DTWorkflow.outpath_workflow_mode_tag().

Referenced by CrabHelper.CrabHelper.check_crabtask(), CrabHelper.CrabHelper.crab_config_filepath(), DTWorkflow.DTWorkflow.dump_options(), DTWorkflow.DTWorkflow.load_options_command(), DTWorkflow.DTWorkflow.prepare_common_write(), DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_write(), DTWorkflow.DTWorkflow.pset_path(), DTWorkflow.DTWorkflow.result_path(), DTWorkflow.DTWorkflow.run(), CrabHelper.CrabHelper.write_crabConfig(), and DTWorkflow.DTWorkflow.write_pset_file().

334  def local_path(self):
335  """ Output path on local machine """
336  if self.options.run and self.options.label:
337  prefix = "Run%d-%s_v%d" % ( self.options.run,
338  self.options.label,
339  self.options.trial)
340  else:
341  prefix = ""
342  if self.outpath_workflow_mode_tag:
343  path = os.path.join( self.options.working_dir,
344  prefix,
345  self.outpath_workflow_mode_tag)
346  else:
347  path = os.path.join( self.options.working_dir,
348  prefix,
349  self.outpath_command_tag )
350  return path
351 

◆ outpath_workflow_mode_tag()

def DTWorkflow.DTWorkflow.outpath_workflow_mode_tag (   self)

Definition at line 314 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.outpath_workflow_mode_dict, DTVdriftWorkflow.DTvdriftWorkflow.outpath_workflow_mode_dict, and DTTtrigWorkflow.DTttrigWorkflow.outpath_workflow_mode_dict.

Referenced by DTWorkflow.DTWorkflow.local_path(), and DTWorkflow.DTWorkflow.remote_out_path().

314  def outpath_workflow_mode_tag(self):
315  if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
316  raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
317  return self.outpath_workflow_mode_dict[self.options.workflow_mode]
318 

◆ prepare_common_dump()

def DTWorkflow.DTWorkflow.prepare_common_dump (   self,
  db_path 
)

Definition at line 229 of file DTWorkflow.py.

Referenced by DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_dump(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_dump(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_dump(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_dump().

229  def prepare_common_dump(self, db_path):
230  self.process = tools.loadCmsProcess(self.pset_template)
231  self.process.calibDB.connect = 'sqlite_file:%s' % db_path
232  try:
233  path = self.result_path
234  except:
235  path = os.getcwd()
236  print("path", path)
237  out_path = os.path.abspath(os.path.join(path,
238  os.path.splitext(db_path)[0] + ".txt"))
239 
240  self.process.dumpToFile.outputFileName = out_path
241 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def loadCmsProcess(psetPath)
Definition: tools.py:56

◆ prepare_common_submit()

def DTWorkflow.DTWorkflow.prepare_common_submit (   self)
Common operations used in most prepare_[workflow_mode]_submit functions

Definition at line 194 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.add_local_custom_db(), DTWorkflow.DTWorkflow.add_local_t0_db(), DTWorkflow.DTWorkflow.add_local_vdrift_db(), DTWorkflow.DTWorkflow.add_preselection(), DTWorkflow.DTWorkflow.add_raw_option(), DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_submit(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_submit(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_submit().

194  def prepare_common_submit(self):
195  """ Common operations used in most prepare_[workflow_mode]_submit functions"""
196  if not self.options.run:
197  raise ValueError("Option run is required for submission!")
198  if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
199  self.add_local_t0_db()
200 
201  if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
202  self.add_local_vdrift_db()
203 
204  if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
205  self.add_local_custom_db()
206 
207  if self.options.run_on_RAW:
208  self.add_raw_option()
209  if self.options.preselection:
210  self.add_preselection()
211 

◆ prepare_common_write()

def DTWorkflow.DTWorkflow.prepare_common_write (   self,
  do_hadd = True 
)
Common operations used in most prepare_[workflow_mode]_erite functions

Definition at line 212 of file DTWorkflow.py.

References CrabHelper.CrabHelper.crab_config_filepath(), CrabHelper.CrabHelper.crabFunctions, DTWorkflow.DTWorkflow.files_reveived, DTWorkflow.DTWorkflow.get_output_files(), tools.haddLocal(), DTWorkflow.DTWorkflow.load_options_command(), DTWorkflow.DTWorkflow.local_path(), DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.output_file, mergeLHE.BaseLHEMerger.output_file, DTVdriftWorkflow.DTvdriftWorkflow.output_file, DTTtrigWorkflow.DTttrigWorkflow.output_file, HltDiff.output_file, and DTWorkflow.DTWorkflow.result_path().

Referenced by DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_write(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_write(), and DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_write().

212  def prepare_common_write(self, do_hadd=True):
213  """ Common operations used in most prepare_[workflow_mode]_erite functions"""
214  self.load_options_command("submit")
215  output_path = os.path.join( self.local_path, "unmerged_results" )
216  merged_file = os.path.join(self.result_path, self.output_file)
217  crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
218  initUpdate = False)
219  if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
220  self.get_output_files(crabtask, output_path)
221  log.info("Received files from storage element")
222  log.info("Using hadd to merge output files")
223  if not self.options.no_exec and do_hadd:
224  returncode = tools.haddLocal(output_path, merged_file)
225  if returncode != 0:
226  raise RuntimeError("Failed to merge files with hadd")
227  return crabtask.crabConfig.Data.outputDatasetTag
228 
def haddLocal(localdir, result_file, extension='root')
Definition: tools.py:41

◆ prepare_workflow()

def DTWorkflow.DTWorkflow.prepare_workflow (   self)
Abstract implementation of prepare workflow function

Definition at line 88 of file DTWorkflow.py.

References __class__< T >.__class__().

Referenced by DTWorkflow.DTWorkflow.run().

88  def prepare_workflow(self):
89  """ Abstract implementation of prepare workflow function"""
90  errmsg = "Class `{}` does not implement `{}`"
91  raise NotImplementedError( errmsg.format(self.__class__.__name__,
92  "prepare_workflow"))
93 

◆ pset_path()

def DTWorkflow.DTWorkflow.pset_path (   self)
full path to the pset file 

Definition at line 370 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.local_path(), DTWorkflow.DTWorkflow.pset_name, DTT0WireWorkflow.DTT0WireWorkflow.pset_name, DTVdriftWorkflow.DTvdriftWorkflow.pset_name, and DTTtrigWorkflow.DTttrigWorkflow.pset_name.

Referenced by DTWorkflow.DTWorkflow.runCMSSWtask(), and DTWorkflow.DTWorkflow.write_pset_file().

370  def pset_path(self):
371  """ full path to the pset file """
372  basepath = os.path.join( self.local_path, "psets")
373  if not os.path.exists( basepath ):
374  os.makedirs( basepath )
375  return os.path.join( basepath, self.pset_name )
376 

◆ pset_template_base_bath()

def DTWorkflow.DTWorkflow.pset_template_base_bath (   self)
Base path to folder containing pset files for cmsRun

Definition at line 360 of file DTWorkflow.py.

360  def pset_template_base_bath(self):
361  """ Base path to folder containing pset files for cmsRun"""
362  return os.path.expandvars(os.path.join("$CMSSW_BASE",
363  "src",
364  "CalibMuon",
365  "test",
366  )
367  )
368 

◆ remote_out_path()

def DTWorkflow.DTWorkflow.remote_out_path (   self)
Output path on remote excluding user base path
Returns a dict if crab is used due to crab path setting policy

Definition at line 292 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.outpath_command_tag, DTVdriftWorkflow.DTvdriftWorkflow.outpath_command_tag, DTTtrigWorkflow.DTttrigWorkflow.outpath_command_tag, DTWorkflow.DTWorkflow.outpath_command_tag, DTWorkflow.DTWorkflow.outpath_workflow_mode_tag(), str, FWLite.ElectronMVAID.tag, Entry< T >.tag, tnp::TagProbePair.tag, MeasurementEstimator::OpaquePayload.tag, cond::persistency::IOVEditorData.tag, Inspector.Inspector.tag, BeamSpotFakeConditions.tag, FWLite.WorkingPoints.tag, TrackListMerger::TkEDGetTokenss.tag, MuIsolatorResultProducer< BT >::DepositConf.tag, DQMNet::CoreObject.tag, personalPlayback.FrameworkJob.tag, DTWorkflow.DTWorkflow.tag(), EcalTPGParamReaderFromDB.user, dataset.BaseDataset.user, production_tasks.Task.user, popcon::RPCObPVSSmapData.user, popcon::RpcObGasData.user, popcon::RpcDataT.user, popcon::RpcDataV.user, popcon::RpcDataFebmap.user, popcon::RpcDataS.user, popcon::RpcDataI.user, popcon::RpcDataUXC.user, popcon::RpcDataGasMix.user, EcalDBConnection.user, production_tasks.ParseOptions.user, MatrixInjector.MatrixInjector.user, production_tasks.FindOnCastor.user, production_tasks.CheckForMask.user, production_tasks.CheckForWrite.user, and DTWorkflow.DTWorkflow.user().

292  def remote_out_path(self):
293  """ Output path on remote excluding user base path
294  Returns a dict if crab is used due to crab path setting policy"""
295  if self.options.command =="submit":
296  return {
297  "outLFNDirBase" : os.path.join( "/store",
298  "user",
299  self.user,
300  'DTCalibration/',
301  self.outpath_command_tag,
302  self.outpath_workflow_mode_tag),
303  "outputDatasetTag" : self.tag
304  }
305  else:
306  return os.path.join( 'DTCalibration/',
307  datasetstr,
308  'Run' + str(self.options.run),
309  self.outpath_command_tag,
310  self.outpath_workflow_mode_tag,
311  'v' + str(self.options.trial),
312  )
#define str(s)

◆ result_path()

◆ run()

def DTWorkflow.DTWorkflow.run (   self)
Generalized function to run workflow command

Definition at line 58 of file DTWorkflow.py.

References __class__< T >.__class__(), DTWorkflow.DTWorkflow.check_missing_options(), DTWorkflow.DTWorkflow.dump_options(), DTWorkflow.DTWorkflow.load_options(), DTWorkflow.DTWorkflow.local_path(), DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTT0WireWorkflow.DTT0WireWorkflow.prepare_workflow(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_workflow(), DTTtrigWorkflow.DTttrigWorkflow.prepare_workflow(), DTWorkflow.DTWorkflow.prepare_workflow(), DTWorkflow.DTWorkflow.required_options_dict, and DTWorkflow.DTWorkflow.required_options_prepare_dict.

Referenced by DTWorkflow.DTWorkflow.all(), Types.EventID.cppID(), Types.LuminosityBlockID.cppID(), and o2olib.O2OTool.execute().

58  def run(self):
59  """ Generalized function to run workflow command"""
60  msg = "Preparing %s workflow" % self.options.workflow
61  if hasattr(self.options, "command"):
62  msg += " for command %s" % self.options.command
63  log.info(msg)
64  if self.options.config_path:
65  self.load_options( self.options.config_path )
66  #check if all options to prepare the command are used
67  self.check_missing_options(self.required_options_prepare_dict)
68  self.prepare_workflow()
69  # create output folder if they do not exist yet
70  if not os.path.exists( self.local_path ):
71  os.makedirs(self.local_path)
72  # dump used options
73  self.dump_options()
74  #check if all options to run the command are used
75  self.check_missing_options(self.required_options_dict)
76  try:
77  run_function = getattr(self, self.options.command)
78  except AttributeError:
79  errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
80  if hasattr(self.options, "workflow_mode"):
81  errmsg += "and workflow mode %s" % self.options.workflow_mode
82  raise NotImplementedError( errmsg.format(self.__class__.__name__,
83  self.options.command))
84  log.debug("Running command %s" % self.options.command)
85  # call chosen function
86  run_function()
87 

◆ runCMSSWtask()

def DTWorkflow.DTWorkflow.runCMSSWtask (   self,
  pset_path = "" 
)
Run a cmsRun job locally. The member variable self.pset_path is used
    if pset_path argument is not given

Definition at line 276 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, DTWorkflow.DTWorkflow.pset_name, DTT0WireWorkflow.DTT0WireWorkflow.pset_name, DTVdriftWorkflow.DTvdriftWorkflow.pset_name, DTTtrigWorkflow.DTttrigWorkflow.pset_name, and DTWorkflow.DTWorkflow.pset_path().

Referenced by DTWorkflow.DTWorkflow.correction(), DTWorkflow.DTWorkflow.dump(), DTT0WireWorkflow.DTT0WireWorkflow.submit(), and DTWorkflow.DTWorkflow.write().

276  def runCMSSWtask(self, pset_path=""):
277  """ Run a cmsRun job locally. The member variable self.pset_path is used
278  if pset_path argument is not given"""
279  if self.options.no_exec:
280  return 0
281  process = subprocess.Popen( "cmsRun %s" % self.pset_path,
282  stdout=subprocess.PIPE,
283  stderr=subprocess.STDOUT,
284  shell = True)
285  stdout = process.communicate()[0]
286  log.info(stdout)
287  if process.returncode != 0:
288  raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
289  return process.returncode
290 

◆ submit()

def DTWorkflow.DTWorkflow.submit (   self)

Definition at line 104 of file DTWorkflow.py.

References CrabHelper.CrabHelper.submit_crab_task().

104  def submit(self):
105  self.submit_crab_task()
106 

◆ tag()

def DTWorkflow.DTWorkflow.tag (   self)

Definition at line 320 of file DTWorkflow.py.

References DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, and str.

Referenced by DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_correction(), and DTWorkflow.DTWorkflow.remote_out_path().

320  def tag(self):
321  return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
322 
#define str(s)

◆ user()

def DTWorkflow.DTWorkflow.user (   self)

Definition at line 324 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow._user, CrabHelper.CrabHelper.crab(), DTCalibrationWorker.DTCalibrationWorker.options, and DTWorkflow.DTWorkflow.options.

Referenced by DTWorkflow.DTWorkflow.remote_out_path().

324  def user(self):
325  if self._user:
326  return self._user
327  if hasattr(self.options, "user") and self.options.user:
328  self._user = self.options.user
329  else:
330  self._user = self.crab.checkusername()
331  return self._user
332 

◆ write()

def DTWorkflow.DTWorkflow.write (   self)

Definition at line 111 of file DTWorkflow.py.

References DTWorkflow.DTWorkflow.runCMSSWtask().

Referenced by pkg.AbstractPkg.generate(), and querying.connection.write_and_commit().

111  def write(self):
112  self.runCMSSWtask()
113 

◆ write_pset_file()

def DTWorkflow.DTWorkflow.write_pset_file (   self)

Member Data Documentation

◆ _user

DTWorkflow.DTWorkflow._user
private

Definition at line 39 of file DTWorkflow.py.

Referenced by DTWorkflow.DTWorkflow.user().

◆ digilabel

DTWorkflow.DTWorkflow.digilabel

Definition at line 24 of file DTWorkflow.py.

Referenced by DTWorkflow.DTWorkflow.add_raw_option().

◆ files_reveived

DTWorkflow.DTWorkflow.files_reveived

Definition at line 38 of file DTWorkflow.py.

Referenced by DTWorkflow.DTWorkflow.prepare_common_write().

◆ input_files

◆ options

DTWorkflow.DTWorkflow.options

Definition at line 22 of file DTWorkflow.py.

Referenced by betterConfigParser.BetterConfigParser.__updateDict(), submitPVValidationJobs.BetterConfigParser.__updateDict(), DTWorkflow.DTWorkflow.add_local_calib_db(), DTWorkflow.DTWorkflow.add_local_custom_db(), DTWorkflow.DTWorkflow.add_local_t0_db(), DTWorkflow.DTWorkflow.add_local_vdrift_db(), DTWorkflow.DTWorkflow.add_preselection(), DTWorkflow.DTWorkflow.all(), confdb.HLTProcess.buildOptions(), CrabHelper.CrabHelper.check_crabtask(), DTWorkflow.DTWorkflow.check_missing_options(), betterConfigParser.BetterConfigParser.checkInput(), CrabHelper.CrabHelper.crab_config_filename(), CrabHelper.CrabHelper.crab_config_filepath(), CrabHelper.CrabHelper.crab_taskname(), DTWorkflow.DTWorkflow.dump_options(), CrabHelper.CrabHelper.fill_options_from_crab_config(), confdb.HLTProcess.fixPrescales(), DTWorkflow.DTWorkflow.get_config_name(), confdb.HLTProcess.getRawConfigurationFromDB(), confdb.HLTProcess.getSetupConfigurationFromDB(), edmIntegrityCheck.IntegrityCheck.listFiles(), DTWorkflow.DTWorkflow.load_options(), DTWorkflow.DTWorkflow.load_options_command(), DTWorkflow.DTWorkflow.local_path(), production_tasks.MonitorJobs.monitor(), DTWorkflow.DTWorkflow.outpath_workflow_mode_tag(), DTWorkflow.DTWorkflow.prepare_common_submit(), DTWorkflow.DTWorkflow.prepare_common_write(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_dump(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_submit(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_meantimer_write(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_correction(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_dump(), DTTtrigWorkflow.DTttrigWorkflow.prepare_residuals_submit(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_dump(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_segment_write(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_correction(), DTTtrigWorkflow.DTttrigWorkflow.prepare_timeboxes_dump(), DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_submit(), DTTtrigWorkflow.DTttrigWorkflow.prepare_validation_write(), DTT0WireWorkflow.DTT0WireWorkflow.prepare_workflow(), DTVdriftWorkflow.DTvdriftWorkflow.prepare_workflow(), DTTtrigWorkflow.DTttrigWorkflow.prepare_workflow(), edmIntegrityCheck.IntegrityCheck.query(), production_tasks.BaseDataset.query(), DTWorkflow.DTWorkflow.remote_out_path(), edmIntegrityCheck.IntegrityCheck.report(), cmsswPreprocessor.CmsswPreprocessor.run(), DTWorkflow.DTWorkflow.run(), production_tasks.CheckDatasetExists.run(), production_tasks.BaseDataset.run(), production_tasks.GenerateMask.run(), production_tasks.CreateJobDirectory.run(), production_tasks.SourceCFG.run(), production_tasks.FullCFG.run(), production_tasks.WriteToDatasets.run(), production_tasks.RunCMSBatch.run(), production_tasks.MonitorJobs.run(), production_tasks.CheckJobStatus.run(), production_tasks.WriteJobReport.run(), production_tasks.CleanJobFiles.run(), DTWorkflow.DTWorkflow.runCMSSWtask(), edmIntegrityCheck.IntegrityCheck.structured(), CrabHelper.CrabHelper.submit_crab_task(), DTWorkflow.DTWorkflow.tag(), edmIntegrityCheck.IntegrityCheck.test(), DTWorkflow.DTWorkflow.user(), and CrabHelper.CrabHelper.write_crabConfig().

◆ outpath_command_tag

DTWorkflow.DTWorkflow.outpath_command_tag

◆ output_files

DTWorkflow.DTWorkflow.output_files

Definition at line 34 of file DTWorkflow.py.

◆ process

DTWorkflow.DTWorkflow.process

Definition at line 230 of file DTWorkflow.py.

Referenced by adaptToRunAtMiniAOD.adaptToRunAtMiniAOD.__init__(), DTWorkflow.DTWorkflow.add_local_calib_db(), DTWorkflow.DTWorkflow.add_local_custom_db(), DTWorkflow.DTWorkflow.add_local_t0_db(), DTWorkflow.DTWorkflow.add_local_vdrift_db(), DTWorkflow.DTWorkflow.add_preselection(), DTWorkflow.DTWorkflow.add_raw_option(), ConfigBuilder.ConfigBuilder.addCommon(), ConfigBuilder.ConfigBuilder.addConditions(), ConfigBuilder.ConfigBuilder.addCustomise(), ConfigBuilder.ConfigBuilder.addMaxEvents(), ConfigBuilder.ConfigBuilder.addOutput(), ConfigBuilder.ConfigBuilder.addSource(), ConfigBuilder.ConfigBuilder.addStandardSequences(), ConfigBuilder.ConfigBuilder.build_production_info(), ConfigBuilder.ConfigBuilder.completeInputCommand(), adaptToRunAtMiniAOD.adaptToRunAtMiniAOD.convertModuleToMiniAODInput(), ConfigBuilder.ConfigBuilder.finalizeFastSimHLT(), ConfigBuilder.ConfigBuilder.load(), runTauIdMVA.TauIDEmbedder.load_againstElectronMVA6(), ConfigBuilder.ConfigBuilder.loadAndRemember(), runTauIdMVA.TauIDEmbedder.loadMVA_WPs_run2_2017(), ConfigBuilder.ConfigBuilder.loadPhase2GTMenu(), ConfigBuilder.ConfigBuilder.prepare(), ConfigBuilder.ConfigBuilder.prepare_DIGI(), ConfigBuilder.ConfigBuilder.prepare_DQM(), ConfigBuilder.ConfigBuilder.prepare_GEN(), ConfigBuilder.ConfigBuilder.prepare_HLT(), ConfigBuilder.ConfigBuilder.prepare_PATFILTER(), ConfigBuilder.ConfigBuilder.prepare_SKIM(), ConfigBuilder.ConfigBuilder.prepare_VALIDATION(), runTauIdMVA.TauIDEmbedder.processDeepProducer(), ConfigBuilder.ConfigBuilder.renameHLTprocessInSequence(), ConfigBuilder.ConfigBuilder.renameInputTagsInSequence(), runTauIdMVA.TauIDEmbedder.runTauID(), ConfigBuilder.ConfigBuilder.scheduleSequence(), runTauIdMVA.TauIDEmbedder.tauIDMVAinputs(), and DTWorkflow.DTWorkflow.write_pset_file().

◆ pset_name

DTWorkflow.DTWorkflow.pset_name

◆ required_options_dict

DTWorkflow.DTWorkflow.required_options_dict

◆ required_options_prepare_dict

DTWorkflow.DTWorkflow.required_options_prepare_dict

◆ run_all_command

DTWorkflow.DTWorkflow.run_all_command

Definition at line 37 of file DTWorkflow.py.

Referenced by DTWorkflow.DTWorkflow.all().