CMS 3D CMS Logo

List of all members | Public Member Functions
production_tasks.MonitorJobs Class Reference
Inheritance diagram for production_tasks.MonitorJobs:
production_tasks.Task

Public Member Functions

def __init__ (self, dataset, user, options)
 
def getjobid (self, job_dir)
 
def monitor (self, jobs, previous)
 
def run (self, input)
 
- Public Member Functions inherited from production_tasks.Task
def __init__ (self, name, dataset, user, options, instance=None)
 
def addOption (self, parser)
 
def getname (self)
 
def run (self, input)
 

Additional Inherited Members

- Public Attributes inherited from production_tasks.Task
 dataset
 
 instance
 
 name
 
 options
 
 user
 

Detailed Description

Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished.

Definition at line 579 of file production_tasks.py.

Constructor & Destructor Documentation

def production_tasks.MonitorJobs.__init__ (   self,
  dataset,
  user,
  options 
)

Definition at line 581 of file production_tasks.py.

581  def __init__(self, dataset, user, options):
582  Task.__init__(self,'MonitorJobs', dataset, user, options)
583 
def __init__(self, dataset, user, options)

Member Function Documentation

def production_tasks.MonitorJobs.getjobid (   self,
  job_dir 
)
Parse the LSF output to find the job id

Definition at line 584 of file production_tasks.py.

References FrontierConditions_GlobalTag_cff.file, edm.print(), split, and str.

Referenced by production_tasks.MonitorJobs.run().

584  def getjobid(self, job_dir):
585  """Parse the LSF output to find the job id"""
586  input = os.path.join(job_dir,'job_id.txt')
587  result = None
588  if os.path.exists(input):
589  contents = file(input).read()
590  for c in contents.split('\n'):
591  if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
592  try:
593  result = c.split('<')[1].split('>')[0]
594  except Exception as e:
595  print('Job ID parsing error',str(e),c, file=sys.stderr)
596  return result
597 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
#define str(s)
double split
Definition: MVATrainer.cc:139
def production_tasks.MonitorJobs.monitor (   self,
  jobs,
  previous 
)

Definition at line 598 of file production_tasks.py.

References gen.parseHeader(), edm.print(), split, and str.

Referenced by production_tasks.MonitorJobs.run().

598  def monitor(self, jobs, previous):
599 
600  #executes bjobs with a list of job IDs
601  cmd = ['bjobs','-u',self.options.batch_user]
602  cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
603  child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
604  stdout, stderr = child.communicate()
605 
606  def parseHeader(header):
607  """Parse the header from bjobs"""
608  tokens = [t for t in header.split(' ') if t]
609  result = {}
610  for i in range(len(tokens)):
611  result[tokens[i]] = i
612 
613  return result
614 
615  result = {}
616  if stdout:
617  lines = stdout.split('\n')
618  if lines:
619  header = parseHeader(lines[0])
620  if not 'STAT' in header or not 'JOBID' in header:
621  print('Problem parsing bjobs header\n',lines, file=sys.stderr)
622  return result
623  for line in lines[1:]:
624  #TODO: Unreliable for some fields, e.g. dates
625  tokens = [t for t in line.split(' ') if t]
626  if len(tokens) < len(header): continue
627  id = tokens[header['JOBID']]
628  user = tokens[header['USER']]
629  status = tokens[header['STAT']]
630 
631  result[id] = status
632 
633  if stderr:
634  lines = stderr.split('\n')
635  if lines:
636  for line in lines:
637  if line and re.match('^Job <\\d*> is not found',line) is not None:
638  try:
639  id = line.split('<')[1].split('>')[0]
640  if id not in result and id not in previous:
641  result[id] = 'FORGOTTEN'
642  except Exception as e:
643  print('Job ID parsing error in STDERR',str(e),line, file=sys.stderr)
644 
645  #after one hour the status is no longer available
646  if result:
647  for id in jobs.values():
648  if id not in result and id in previous:
649  result[id] = previous[id]
650  return result
651 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
def monitor(self, jobs, previous)
#define str(s)
double split
Definition: MVATrainer.cc:139
def production_tasks.MonitorJobs.run (   self,
  input 
)

Definition at line 652 of file production_tasks.py.

References mps_setup.append, production_tasks.Task.dataset, edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, FrontierConditions_GlobalTag_cff.file, production_tasks.MonitorJobs.getjobid(), join(), CSCDCCUnpacker.monitor, production_tasks.MonitorJobs.monitor(), ElectronMVAID.ElectronMVAID.name, average.Average.name, counter.Counter.name, AlignableObjectId::entry.name, histograms.Histograms.name, cond::persistency::TAG::NAME.name, cond::persistency::RUN_INFO::RUN_NUMBER.name, TmModule.name, cond::persistency::GLOBAL_TAG::NAME.name, cond::persistency::TAG::TIME_TYPE.name, core.autovars.NTupleVariable.name, cond::persistency::RUN_INFO::START_TIME.name, cond::persistency::GLOBAL_TAG::VALIDITY.name, cond::persistency::TAG::OBJECT_TYPE.name, cond::persistency::RUN_INFO::END_TIME.name, cond::persistency::GLOBAL_TAG::DESCRIPTION.name, cond::persistency::TAG::SYNCHRONIZATION.name, cond::persistency::GLOBAL_TAG::RELEASE.name, cond::persistency::TAG::END_OF_VALIDITY.name, MEPSet.name, cond::persistency::GLOBAL_TAG::SNAPSHOT_TIME.name, cond::persistency::TAG::DESCRIPTION.name, cond::persistency::GTEditorData.name, cond::persistency::GLOBAL_TAG::INSERTION_TIME.name, cond::persistency::TAG::LAST_VALIDATED_TIME.name, cond::persistency::TAG::INSERTION_TIME.name, FWTGeoRecoGeometry::Info.name, cond::persistency::TAG::MODIFICATION_TIME.name, nanoaod::MergeableCounterTable::SingleColumn< T >.name, preexistingValidation.PreexistingValidation.name, OutputMEPSet.name, Types._Untracked.name, ParameterSet.name, dataset.BaseDataset.name, personalPlayback.Applet.name, PixelDCSObject< T >::Item.name, DQMRivetClient::LumiOption.name, MagCylinder.name, analyzer.Analyzer.name, production_tasks.Task.name, ParSet.name, edm::PathTimingSummary.name, DQMRivetClient::ScaleFactorOption.name, EgHLTOfflineSummaryClient::SumHistBinData.name, SingleObjectCondition.name, cond::persistency::GTProxyData.name, core.autovars.NTupleObjectType.name, MyWatcher.name, edm::PathSummary.name, cond::TimeTypeSpecs.name, lumi::TriggerInfo.name, alignment.Alignment.name, perftools::EdmEventSize::BranchRecord.name, PixelEndcapLinkMaker::Item.name, cond::persistency::GLOBAL_TAG_MAP::GLOBAL_TAG_NAME.name, FWTableViewManager::TableEntry.name, DQMGenericClient::EfficOption.name, PixelBarrelLinkMaker::Item.name, cond::persistency::GLOBAL_TAG_MAP::RECORD.name, EcalLogicID.name, cond::persistency::GLOBAL_TAG_MAP::LABEL.name, cond::persistency::GLOBAL_TAG_MAP::TAG_NAME.name, MEtoEDM< T >::MEtoEDMObject.name, ExpressionHisto< T >.name, cms::DDAlgoArguments.name, XMLProcessor::_loaderBaseConfig.name, cond::persistency::PAYLOAD::HASH.name, cond::persistency::PAYLOAD::OBJECT_TYPE.name, cond::persistency::PAYLOAD::DATA.name, TreeCrawler.Package.name, cond::persistency::PAYLOAD::STREAMER_INFO.name, options.ConnectionHLTMenu.name, MagGeoBuilderFromDDD::volumeHandle.name, cond::persistency::PAYLOAD::VERSION.name, genericValidation.GenericValidation.name, cond::persistency::PAYLOAD::INSERTION_TIME.name, DQMGenericClient::ProfileOption.name, dqmoffline::l1t::HistDefinition.name, nanoaod::MergeableCounterTable::VectorColumn< T >.name, DQMGenericClient::NormOption.name, emtf::Node.name, h4DSegm.name, FastHFShowerLibrary.name, core.TriggerMatchAnalyzer.TriggerMatchAnalyzer.name, PhysicsTools::Calibration::Variable.name, DQMGenericClient::CDOption.name, CounterChecker.name, TrackerSectorStruct.name, cond::TagInfo_t.name, DQMGenericClient::NoFlowOption.name, looper.Looper.name, cond::persistency::IOV::TAG_NAME.name, cond::persistency::IOV::SINCE.name, EDMtoMEConverter.name, cond::persistency::IOV::PAYLOAD_HASH.name, Mapper::definition< ScannerT >.name, classes.MonitorData.name, cond::persistency::IOV::INSERTION_TIME.name, HistogramManager.name, MuonGeometrySanityCheckPoint.name, classes.OutputData.name, options.HLTProcessOptions.name, h2DSegm.name, core.TriggerBitAnalyzer.TriggerBitAnalyzer.name, nanoaod::FlatTable::Column.name, geometry.Structure.name, config.Analyzer.name, core.autovars.NTupleSubObject.name, DQMNet::WaitObject.name, AlpgenParameterName.name, SiStripMonitorDigi.name, core.autovars.NTupleObject.name, config.Service.name, cond::persistency::TAG_LOG::TAG_NAME.name, cond::persistency::TAG_LOG::EVENT_TIME.name, cond::persistency::TAG_LOG::USER_NAME.name, cond::persistency::TAG_LOG::HOST_NAME.name, cond::persistency::TAG_LOG::COMMAND.name, cond::persistency::TAG_LOG::ACTION.name, cond::persistency::TAG_LOG::USER_TEXT.name, core.autovars.NTupleCollection.name, BPHRecoBuilder::BPHRecoSource.name, BPHRecoBuilder::BPHCompSource.name, personalPlayback.FrameworkJob.name, plotscripts.SawTeethFunction.name, crabFunctions.CrabTask.name, hTMaxCell.name, cscdqm::ParHistoDef.name, BeautifulSoup.Tag.name, SummaryOutputProducer::GenericSummary.name, BeautifulSoup.SoupStrainer.name, FileExportPlugin.FileExportPlugin.options, cmsswPreprocessor.CmsswPreprocessor.options, DTCalibrationWorker.DTCalibrationWorker.options, DTWorkflow.DTWorkflow.options, TestProcess.TestProcess.options, DOTExport.DotProducer.options, confdb.HLTProcess.options, production_tasks.Task.options, edmIntegrityCheck.IntegrityCheck.options, validateAlignments.ValidationJobMultiIOV.options, edm.print(), EcalTPGParamReaderFromDB.user, dataset.BaseDataset.user, production_tasks.Task.user, popcon::RpcDataV.user, popcon::RpcDataT.user, popcon::RpcObGasData.user, popcon::RPCObPVSSmapData.user, popcon::RpcDataFebmap.user, popcon::RpcDataS.user, popcon::RpcDataUXC.user, popcon::RpcDataGasMix.user, popcon::RpcDataI.user, MatrixInjector.MatrixInjector.user, and EcalDBConnection.user.

652  def run(self, input):
653 
654  # return #COLIN
655  jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
656  if not os.path.exists(jobsdir):
657  raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
658 
659  subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
660  jobs = {}
661  for s in subjobs:
662  jobs[s] = self.getjobid(s)
663 
664  def checkStatus(stat):
665 
666  #gzip files on the fly
667  actions = {'FilesToCompress':{'Files':[]}}
668 
669  result = {}
670  for j, id in six.iteritems(jobs):
671  if id is None:
672  result[j] = 'UNKNOWN'
673  else:
674  if id in stat:
675  result[j] = stat[id]
676  if result[j] in ['DONE','EXIT','FORGOTTEN']:
677  stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
678  if os.path.exists(stdout):
679  #compress this file
680  actions['FilesToCompress']['Files'].append(stdout)
681  result[j] = '%s.gz' % stdout
682  elif os.path.exists('%s.gz' % stdout):
683  result[j] = '%s.gz' % stdout
684  else:
685  result[j] = 'NOSTDOUT'
686 
687  #also compress the stderr, although this is mostly empty
688  stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
689  if os.path.exists(stderr):
690  #compress this file
691  actions['FilesToCompress']['Files'].append(stderr)
692 
693  compress = GZipFiles(self.dataset,self.user,self.options)
694  compress.run(actions)
695  return result
696 
697  def countJobs(stat):
698  """Count jobs that are monitorable - i.e. not in a final state"""
699  result = []
700  for j, id in six.iteritems(jobs):
701  if id is not None and id in stat:
702  st = stat[id]
703  if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
704  result.append(id)
705  return result
706 
707  def writeKillScript(mon):
708  """Write a shell script to kill the jobs we know about"""
709  kill = os.path.join(jobsdir,'kill_jobs.sh')
710  output = file(kill,'w')
711  script = """
712 #!/usr/bin/env bash
713 echo "Killing jobs"
714 bkill -u %s %s
715  """ % (self.options.batch_user," ".join(mon))
716  output.write(script)
717  output.close()
718  return mon
719 
720  #continue monitoring while there are jobs to monitor
721  status = self.monitor(jobs,{})
722  monitorable = writeKillScript(countJobs(status))
723  count = 0
724 
725  while monitorable:
726  job_status = checkStatus(status)
727  time.sleep(60)
728  status = self.monitor(jobs,status)
729  monitorable = writeKillScript(countJobs(status))
730  if not (count % 3):
731  print('%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset))
732  count += 1
733 
734  return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def monitor(self, jobs, previous)