CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions
production_tasks.MonitorJobs Class Reference
Inheritance diagram for production_tasks.MonitorJobs:
production_tasks.Task

Public Member Functions

def __init__
 
def getjobid
 
def monitor
 
def run
 
- Public Member Functions inherited from production_tasks.Task
def __init__
 
def addOption
 
def getname
 
def run
 

Additional Inherited Members

- Public Attributes inherited from production_tasks.Task
 dataset
 
 instance
 
 name
 
 options
 
 user
 

Detailed Description

Monitor LSF jobs created with cmsBatch.py. Blocks until all jobs are finished.

Definition at line 575 of file production_tasks.py.

Constructor & Destructor Documentation

def production_tasks.MonitorJobs.__init__ (   self,
  dataset,
  user,
  options 
)

Definition at line 577 of file production_tasks.py.

578  def __init__(self, dataset, user, options):
579  Task.__init__(self,'MonitorJobs', dataset, user, options)

Member Function Documentation

def production_tasks.MonitorJobs.getjobid (   self,
  job_dir 
)
Parse the LSF output to find the job id

Definition at line 580 of file production_tasks.py.

References mergeVDriftHistosByStation.file, SiPixelLorentzAngle_cfi.read, and split.

Referenced by production_tasks.MonitorJobs.run().

581  def getjobid(self, job_dir):
582  """Parse the LSF output to find the job id"""
583  input = os.path.join(job_dir,'job_id.txt')
584  result = None
585  if os.path.exists(input):
586  contents = file(input).read()
587  for c in contents.split('\n'):
588  if c and re.match('^Job <\\d*> is submitted to queue <.*>',c) is not None:
589  try:
590  result = c.split('<')[1].split('>')[0]
591  except Exception, e:
592  print >> sys.stderr, 'Job ID parsing error',str(e),c
593  return result
double split
Definition: MVATrainer.cc:139
def production_tasks.MonitorJobs.monitor (   self,
  jobs,
  previous 
)

Definition at line 594 of file production_tasks.py.

References gen.parseHeader(), and split.

Referenced by production_tasks.MonitorJobs.run().

595  def monitor(self, jobs, previous):
596 
597  #executes bjobs with a list of job IDs
598  cmd = ['bjobs','-u',self.options.batch_user]
599  cmd.extend([v for v in jobs.values() if v is not None])#filter out unknown IDs
600  child = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
601  stdout, stderr = child.communicate()
602 
603  def parseHeader(header):
604  """Parse the header from bjobs"""
605  tokens = [t for t in header.split(' ') if t]
606  result = {}
607  for i in xrange(len(tokens)):
608  result[tokens[i]] = i
609 
610  return result
611 
612  result = {}
613  if stdout:
614  lines = stdout.split('\n')
615  if lines:
616  header = parseHeader(lines[0])
617  if not 'STAT' in header or not 'JOBID' in header:
618  print >> sys.stderr, 'Problem parsing bjobs header\n',lines
619  return result
620  for line in lines[1:]:
621  #TODO: Unreliable for some fields, e.g. dates
622  tokens = [t for t in line.split(' ') if t]
623  if len(tokens) < len(header): continue
624  id = tokens[header['JOBID']]
625  user = tokens[header['USER']]
626  status = tokens[header['STAT']]
627 
628  result[id] = status
629 
630  if stderr:
631  lines = stderr.split('\n')
632  if lines:
633  for line in lines:
634  if line and re.match('^Job <\\d*> is not found',line) is not None:
635  try:
636  id = line.split('<')[1].split('>')[0]
637  if not result.has_key(id) and not previous.has_key(id):
638  result[id] = 'FORGOTTEN'
639  except Exception, e:
640  print >> sys.stderr, 'Job ID parsing error in STDERR',str(e),line
641 
642  #after one hour the status is no longer available
643  if result:
644  for id in jobs.values():
645  if not result.has_key(id) and previous.has_key(id):
646  result[id] = previous[id]
647  return result
static std::map< std::string, std::string > parseHeader(const std::vector< std::string > &header)
double split
Definition: MVATrainer.cc:139
def production_tasks.MonitorJobs.run (   self,
  input 
)

Definition at line 648 of file production_tasks.py.

References python.multivaluedict.append(), crabWrap.checkStatus(), production_tasks.Task.dataset, CalibratedPatElectronProducer.dataset, CalibratedElectronProducer.dataset, edmIntegrityCheck.IntegrityCheck.dataset, genericValidation.GenericValidationData.dataset, mergeVDriftHistosByStation.file, production_tasks.MonitorJobs.getjobid(), join(), CSCDCCUnpacker.monitor, production_tasks.MonitorJobs.monitor(), ElectronMVAID.ElectronMVAID.name, counter.Counter.name, entry.name, average.Average.name, histograms.Histograms.name, cond::persistency::TAG::NAME.name, core.autovars.NTupleVariable.name, TmModule.name, cond::persistency::GLOBAL_TAG::NAME.name, cond::persistency::TAG::TIME_TYPE.name, genericValidation.GenericValidation.name, cond::persistency::GLOBAL_TAG::VALIDITY.name, cond::persistency::TAG::OBJECT_TYPE.name, preexistingValidation.PreexistingValidation.name, cond::persistency::COND_LOG_TABLE::EXECTIME.name, cond::persistency::GLOBAL_TAG::DESCRIPTION.name, cond::persistency::TAG::SYNCHRONIZATION.name, ora::RecordSpecImpl::Item.name, cond::persistency::COND_LOG_TABLE::IOVTAG.name, cond::persistency::GLOBAL_TAG::RELEASE.name, cond::persistency::TAG::END_OF_VALIDITY.name, cond::persistency::COND_LOG_TABLE::USERTEXT.name, cond::persistency::GLOBAL_TAG::SNAPSHOT_TIME.name, cond::persistency::TAG::DESCRIPTION.name, cond::persistency::GTEditorData.name, cond::persistency::GLOBAL_TAG::INSERTION_TIME.name, cond::persistency::TAG::LAST_VALIDATED_TIME.name, FWTGeoRecoGeometry::Info.name, Types._Untracked.name, cond::persistency::TAG::INSERTION_TIME.name, dataset.BaseDataset.name, cond::persistency::TAG::MODIFICATION_TIME.name, personalPlayback.Applet.name, ParameterSet.name, production_tasks.Task.name, PixelDCSObject< class >::Item.name, analyzer.Analyzer.name, DQMRivetClient::LumiOption.name, MagCylinder.name, ParSet.name, alignment.Alignment.name, DQMRivetClient::ScaleFactorOption.name, SingleObjectCondition.name, EgHLTOfflineSummaryClient::SumHistBinData.name, core.autovars.NTupleObjectType.name, XMLHTRZeroSuppressionLoader::_loaderBaseConfig.name, XMLRBXPedestalsLoader::_loaderBaseConfig.name, cond::persistency::GTProxyData.name, DQMGenericClient::EfficOption.name, MyWatcher.name, edm::PathTimingSummary.name, cond::TimeTypeSpecs.name, lumi::TriggerInfo.name, edm::PathSummary.name, perftools::EdmEventSize::BranchRecord.name, PixelEndcapLinkMaker::Item.name, cond::persistency::GLOBAL_TAG_MAP::GLOBAL_TAG_NAME.name, FWTableViewManager::TableEntry.name, cond::persistency::GLOBAL_TAG_MAP::RECORD.name, PixelBarrelLinkMaker::Item.name, EcalLogicID.name, Mapper::definition< ScannerT >.name, cond::persistency::GLOBAL_TAG_MAP::LABEL.name, McSelector.name, ExpressionHisto< T >.name, cond::persistency::GLOBAL_TAG_MAP::TAG_NAME.name, RecoSelector.name, XMLProcessor::_loaderBaseConfig.name, cond::persistency::PAYLOAD::HASH.name, DQMGenericClient::ProfileOption.name, TreeCrawler.Package.name, cond::persistency::PAYLOAD::OBJECT_TYPE.name, cond::persistency::PAYLOAD::DATA.name, cond::persistency::PAYLOAD::STREAMER_INFO.name, cond::persistency::PAYLOAD::VERSION.name, MagGeoBuilderFromDDD::volumeHandle.name, cond::persistency::PAYLOAD::INSERTION_TIME.name, DQMGenericClient::NormOption.name, options.ConnectionHLTMenu.name, DQMGenericClient::CDOption.name, FastHFShowerLibrary.name, h4DSegm.name, PhysicsTools::Calibration::Variable.name, cond::TagInfo_t.name, EDMtoMEConverter.name, looper.Looper.name, MEtoEDM< T >::MEtoEDMObject.name, cond::persistency::IOV::TAG_NAME.name, TrackerSectorStruct.name, cond::persistency::IOV::SINCE.name, cond::persistency::IOV::PAYLOAD_HASH.name, cond::persistency::IOV::INSERTION_TIME.name, MuonGeometrySanityCheckPoint.name, config.Analyzer.name, config.Service.name, h2DSegm.name, options.HLTProcessOptions.name, core.autovars.NTupleSubObject.name, DQMNet::WaitObject.name, AlpgenParameterName.name, SiStripMonitorDigi.name, core.autovars.NTupleObject.name, cond::persistency::TAG_MIGRATION::SOURCE_ACCOUNT.name, cond::persistency::TAG_MIGRATION::SOURCE_TAG.name, cond::persistency::TAG_MIGRATION::TAG_NAME.name, cond::persistency::TAG_MIGRATION::STATUS_CODE.name, cond::persistency::TAG_MIGRATION::INSERTION_TIME.name, core.autovars.NTupleCollection.name, FastTimerService::LuminosityDescription.name, cond::persistency::PAYLOAD_MIGRATION::SOURCE_ACCOUNT.name, cond::persistency::PAYLOAD_MIGRATION::SOURCE_TOKEN.name, cond::persistency::PAYLOAD_MIGRATION::PAYLOAD_HASH.name, cond::persistency::PAYLOAD_MIGRATION::INSERTION_TIME.name, conddblib.Tag.name, conddblib.GlobalTag.name, personalPlayback.FrameworkJob.name, plotscripts.SawTeethFunction.name, FastTimerService::ProcessDescription.name, hTMaxCell.name, cscdqm::ParHistoDef.name, BeautifulSoup.Tag.name, TiXmlAttribute.name, BeautifulSoup.SoupStrainer.name, FileExportPlugin.FileExportPlugin.options, cmsswPreprocessor.CmsswPreprocessor.options, DOTExport.DotProducer.options, confdb.HLTProcess.options, production_tasks.Task.options, edmIntegrityCheck.IntegrityCheck.options, dataset.BaseDataset.user, EcalTPGParamReaderFromDB.user, production_tasks.Task.user, popcon::RPCObPVSSmapData.user, popcon::RpcDataV.user, popcon::RpcDataT.user, popcon::RpcObGasData.user, popcon::RpcDataGasMix.user, popcon::RpcDataS.user, popcon::RpcDataFebmap.user, popcon::RpcDataI.user, popcon::RpcDataUXC.user, MatrixInjector.MatrixInjector.user, EcalDBConnection.user, and conddblib.TimeType.user.

649  def run(self, input):
650 
651  # return #COLIN
652  jobsdir = input['RunCMSBatch']['LSFJobsTopDir']
653  if not os.path.exists(jobsdir):
654  raise Exception("LSF jobs dir does not exist: '%s'" % jobsdir)
655 
656  subjobs = [s for s in glob.glob("%s/Job_[0-9]*" % jobsdir) if os.path.isdir(s)]
657  jobs = {}
658  for s in subjobs:
659  jobs[s] = self.getjobid(s)
660 
661  def checkStatus(stat):
662 
663  #gzip files on the fly
664  actions = {'FilesToCompress':{'Files':[]}}
665 
666  result = {}
667  for j, id in jobs.iteritems():
668  if id is None:
669  result[j] = 'UNKNOWN'
670  else:
671  if stat.has_key(id):
672  result[j] = stat[id]
673  if result[j] in ['DONE','EXIT','FORGOTTEN']:
674  stdout = os.path.join(j,'LSFJOB_%s' % id,'STDOUT')
675  if os.path.exists(stdout):
676  #compress this file
677  actions['FilesToCompress']['Files'].append(stdout)
678  result[j] = '%s.gz' % stdout
679  elif os.path.exists('%s.gz' % stdout):
680  result[j] = '%s.gz' % stdout
681  else:
682  result[j] = 'NOSTDOUT'
683 
684  #also compress the stderr, although this is mostly empty
685  stderr = os.path.join(j,'LSFJOB_%s' % id,'STDERR')
686  if os.path.exists(stderr):
687  #compress this file
688  actions['FilesToCompress']['Files'].append(stderr)
689 
690  compress = GZipFiles(self.dataset,self.user,self.options)
691  compress.run(actions)
692  return result
693 
694  def countJobs(stat):
695  """Count jobs that are monitorable - i.e. not in a final state"""
696  result = []
697  for j, id in jobs.iteritems():
698  if id is not None and stat.has_key(id):
699  st = stat[id]
700  if st in ['PEND','PSUSP','RUN','USUSP','SSUSP','WAIT']:
701  result.append(id)
702  return result
703 
704  def writeKillScript(mon):
705  """Write a shell script to kill the jobs we know about"""
706  kill = os.path.join(jobsdir,'kill_jobs.sh')
707  output = file(kill,'w')
708  script = """
709 #!/usr/bin/env bash
710 echo "Killing jobs"
711 bkill -u %s %s
712  """ % (self.options.batch_user," ".join(mon))
713  output.write(script)
714  output.close()
715  return mon
716 
717  #continue monitoring while there are jobs to monitor
718  status = self.monitor(jobs,{})
719  monitorable = writeKillScript(countJobs(status))
720  count = 0
721 
722  while monitorable:
723  job_status = checkStatus(status)
724  time.sleep(60)
725  status = self.monitor(jobs,status)
726  monitorable = writeKillScript(countJobs(status))
727  if not (count % 3):
728  print '%s: Monitoring %i jobs (%s)' % (self.name,len(monitorable),self.dataset)
729  count += 1
730 
return {'LSFJobStatus':checkStatus(status),'LSFJobIDs':jobs}
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def checkStatus
Definition: crabWrap.py:205