CMS 3D CMS Logo

LaunchOnCondor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 import urllib
5 import string
6 import os
7 import sys
8 import glob
9 import fnmatch
10 import commands
11 
12 CopyRights = '####################################\n'
13 CopyRights += '# LaunchOnFarm Script #\n'
14 CopyRights += '# Loic.quertenmont@cern.ch #\n'
15 CopyRights += '# April 2010 #\n'
16 CopyRights += '####################################\n'
17 
18 Farm_Directories = []
19 Path_Cmd = ''
20 Path_Shell = ''
21 Path_Log = ''
22 Path_Cfg = ''
23 Jobs_Count = 0
24 Jobs_Name = ''
25 Jobs_Index = ''
26 Jobs_Seed = 0
27 Jobs_NEvent =-1
28 Jobs_Skip = 0
29 Jobs_Queue = '8nh'
30 Jobs_Inputs = []
31 Jobs_FinalCmds = []
32 Jobs_RunHere = 0
33 
34 useLSF = True
35 LSFlog = True
36 runInteractively = False
37 
39  global Jobs_Name
40  global Jobs_Index
41  global Jobs_Count
42  global Jobs_Seed
43  global Jobs_Skip
44  global Jobs_NEvent
45  global Jobs_Inputs
46  global Jobs_FinalCmds
47  global Path_Cfg
48  global CopyRights
49  Path_Cfg = Farm_Directories[1]+Jobs_Index+Jobs_Name+'_cfg.py'
50 
51  config_file=open(argv[1],'r')
52  config_txt = '\n\n' + CopyRights + '\n\n'
53  config_txt += config_file.read()
54  config_file.close()
55  i = 2
56  while i < len(argv)-1:
57  config_txt = config_txt.replace(argv[i],argv[i+1])
58  i+=2
59 
60  #Default Replacements
61  config_txt = config_txt.replace("XXX_I_XXX" ,"%04i"%Jobs_Count)
62  config_txt = config_txt.replace("XXX_PATH_XXX" ,os.getcwd())
63  config_txt = config_txt.replace("XXX_OUTPUT_XXX" ,Jobs_Name)
64  config_txt = config_txt.replace("XXX_NAME_XXX" ,Jobs_Index+Jobs_Name)
65  config_txt = config_txt.replace("XXX_SEED_XXX" ,str(Jobs_Seed+Jobs_Count))
66  config_txt = config_txt.replace("XXX_NEVENTS_XXX" ,str(Jobs_NEvent))
67  config_txt = config_txt.replace("XXX_SKIP_XXX" ,str(Jobs_Skip))
68  if Jobs_Count < len(Jobs_Inputs):
69  config_txt = config_txt.replace("XXX_INPUT_XXX" ,Jobs_Inputs[Jobs_Count])
70 
71  config_file=open(Path_Cfg,'w')
72  config_file.write(config_txt)
73  config_file.close()
74 
76  global Path_Shell
77  global Path_Cfg
78  global CopyRights
79  global Jobs_RunHere
80  global Jobs_FinalCmds
81  Path_Shell = Farm_Directories[1]+Jobs_Index+Jobs_Name+'.sh'
82 
83  function_argument='('
84  for i in range(2,len(argv)):
85  function_argument+="%s" % argv[i]
86  if i != len(argv)-1:
87  function_argument+=', '
88  function_argument+=')'
89 
90  shell_file=open(Path_Shell,'w')
91  shell_file.write('#! /bin/sh\n')
92  shell_file.write(CopyRights + '\n')
93  shell_file.write('export SCRAM_ARCH='+os.getenv("SCRAM_ARCH","slc5_amd64_gcc462")+'\n')
94  shell_file.write('export BUILD_ARCH='+os.getenv("BUILD_ARCH","slc5_amd64_gcc462")+'\n')
95  shell_file.write('export VO_CMS_SW_DIR='+os.getenv("VO_CMS_SW_DIR","/nfs/soft/cms")+'\n')
96  #shell_file.write('source /nfs/soft/cms/cmsset_default.sh\n')
97  shell_file.write('cd ' + os.getcwd() + '\n')
98  shell_file.write('eval `scramv1 runtime -sh`\n')
99 
100  if argv[0]=='BASH':
101  if Jobs_RunHere==0:
102  shell_file.write('cd -\n')
103  shell_file.write(argv[1] + " %s\n" % function_argument)
104  elif argv[0]=='ROOT':
105  if Jobs_RunHere==0:
106  shell_file.write('cd -\n')
107  shell_file.write('root -l -b << EOF\n')
108  shell_file.write(' TString makeshared(gSystem->GetMakeSharedLib());\n')
109  shell_file.write(' TString dummy = makeshared.ReplaceAll("-W ", "-Wno-deprecated-declarations -Wno-deprecated -Wno-unused-local-typedefs ");\n')
110  shell_file.write(' TString dummy = makeshared.ReplaceAll("-Wshadow ", " -std=c++0x -D__USE_XOPEN2K8 ");\n')
111  shell_file.write(' cout << "Compilling with the following arguments: " << makeshared << endl;\n')
112  shell_file.write(' gSystem->SetMakeSharedLib(makeshared);\n')
113  shell_file.write(' gSystem->SetIncludePath( "-I$ROOFITSYS/include" );\n')
114  shell_file.write(' .x %s+' % argv[1] + function_argument + '\n')
115  shell_file.write(' .q\n')
116  shell_file.write('EOF\n\n')
117  elif argv[0]=='FWLITE':
118  if Jobs_RunHere==0:
119  shell_file.write('cd -\n')
120  shell_file.write('root -l -b << EOF\n')
121  shell_file.write(' TString makeshared(gSystem->GetMakeSharedLib());\n')
122  shell_file.write(' TString dummy = makeshared.ReplaceAll("-W ", "-Wno-deprecated-declarations -Wno-deprecated ");\n')
123  shell_file.write(' TString dummy = makeshared.ReplaceAll("-Wshadow ", " -std=c++0x -D__USE_XOPEN2K8 ");\n')
124  shell_file.write(' cout << "Compilling with the following arguments: " << makeshared << endl;\n')
125  shell_file.write(' gSystem->SetMakeSharedLib(makeshared);\n')
126  shell_file.write(' gSystem->SetIncludePath("-I$ROOFITSYS/include");\n')
127  shell_file.write(' gSystem->Load("libFWCoreFWLite");\n')
128  shell_file.write(' FWLiteEnabler::enable();\n')
129  shell_file.write(' gSystem->Load("libDataFormatsFWLite.so");\n')
130  shell_file.write(' gSystem->Load("libAnalysisDataFormatsSUSYBSMObjects.so");\n')
131  shell_file.write(' gSystem->Load("libDataFormatsVertexReco.so");\n')
132  shell_file.write(' gSystem->Load("libDataFormatsHepMCCandidate.so");\n')
133  shell_file.write(' gSystem->Load("libPhysicsToolsUtilities.so");\n')
134  shell_file.write(' gSystem->Load("libdcap.so");\n')
135  shell_file.write(' .x %s+' % argv[1] + function_argument + '\n')
136  shell_file.write(' .q\n')
137  shell_file.write('EOF\n\n')
138  elif argv[0]=='CMSSW':
139  CreateTheConfigFile(argv);
140  if Jobs_RunHere==0:
141  shell_file.write('cd -\n')
142  shell_file.write('cmsRun ' + os.getcwd() + '/'+Path_Cfg + '\n')
143  else:
144  print() #Program to use is not specified... Guess it is bash command
145  shell_file.write('#Program to use is not specified... Guess it is bash command\n')
146  shell_file.write(argv[1] + " %s\n" % function_argument)
147 
148  for i in range(len(Jobs_FinalCmds)):
149  shell_file.write(Jobs_FinalCmds[i]+'\n')
150  shell_file.write('mv '+ Jobs_Name+'* '+os.getcwd()+'/'+Farm_Directories[3]+'\n')
151  shell_file.close()
152  os.system("chmod 777 "+Path_Shell)
153 
154 
156  global useLSF
157  global Path_Cmd
158  global CopyRights
159  Path_Cmd = Farm_Directories[1]+Jobs_Name+'.cmd'
160  cmd_file=open(Path_Cmd,'w')
161 
162  if useLSF:
163  cmd_file.write(CopyRights + '\n')
164  else:
165  cmd_file.write('Universe = vanilla\n')
166  cmd_file.write('Environment = CONDORJOBID=$(Process)\n')
167  cmd_file.write('notification = Error\n')
168  #code specific for louvain
169  if(commands.getstatusoutput("uname -n")[1].find("ucl.ac.be")!=-1):
170  cmd_file.write('requirements = (CMSFARM=?=True)&&(Memory > 200)\n')
171  else:
172  cmd_file.write('requirements = (Memory > 200)\n')
173  cmd_file.write('should_transfer_files = YES\n')
174  cmd_file.write('when_to_transfer_output = ON_EXIT\n')
175  cmd_file.close()
176 
178  global runInteractively
179  global useLSF
180  global Path_Shell
181  global Path_Cmd
182  global Path_Log
183  Path_Log = Farm_Directories[2]+Jobs_Index+Jobs_Name
184  cmd_file=open(Path_Cmd,'a')
185 
186  if runInteractively:
187  cmd_file.write("sh "+ os.getcwd() + "/"+Path_Shell + "\n")
188  elif useLSF:
189  if LSFlog :
190  cmd_file.write("bsub -q " + Jobs_Queue + " -J " + Jobs_Name+Jobs_Index + " '" + os.getcwd() + "/"+Path_Shell + " 0 ele'\n")
191  else :
192  cmd_file.write("bsub -o /dev/null -q " + Jobs_Queue + " -J " + Jobs_Name+Jobs_Index + " '" + os.getcwd() + "/"+Path_Shell + " 0 ele'\n")
193  else:
194  cmd_file.write('\n')
195  cmd_file.write('Executable = %s\n' % Path_Shell)
196  cmd_file.write('output = %s.out\n' % Path_Log)
197  cmd_file.write('error = %s.err\n' % Path_Log)
198  cmd_file.write('log = %s.log\n' % Path_Log)
199  cmd_file.write('Queue 1\n')
200  cmd_file.close()
201 
202 def CreateDirectoryStructure(FarmDirectory):
203  global Jobs_Name
204  global Farm_Directories
205  Farm_Directories = [FarmDirectory+'/', FarmDirectory+'/inputs/', FarmDirectory+'/logs/', FarmDirectory+'/outputs/']
206  for i in range(0,len(Farm_Directories)):
207  if os.path.isdir(Farm_Directories[i]) == False:
208  os.system('mkdir ' + Farm_Directories[i])
209 
210 def SendCluster_LoadInputFiles(path, NJobs):
211  global Jobs_Inputs
212  input_file = open(path,'r')
213  input_lines = input_file.readlines()
214  input_file.close()
215  #input_lines.sort()
216 
217  BlockSize = (len(input_lines)/NJobs)
218  LineIndex = 0
219  JobIndex = 0
220  BlockIndex = 0
221  Jobs_Inputs = [""]
222  while LineIndex < len(input_lines):
223  Jobs_Inputs[JobIndex] += input_lines[LineIndex]
224  LineIndex +=1
225  BlockIndex+=1
226  if BlockIndex>BlockSize:
227  BlockIndex = 0
228  JobIndex += 1
229  Jobs_Inputs.append("")
230  return JobIndex+1
231 
232 def SendCluster_Create(FarmDirectory, JobName):
233  global useLSF
234  global Jobs_Name
235  global Jobs_Count
236  global Farm_Directories
237  global runInteractively
238 
239  if(runInteractively): useLSF=True
240 
241  #determine if the submission system is LSF batch or condor
242  command_out = commands.getstatusoutput("bjobs")[1]
243  if(command_out.find("command not found")<0): useLSF = True
244  else: useLSF = False;
245 
246  Jobs_Name = JobName
247  Jobs_Count = 0
248  CreateDirectoryStructure(FarmDirectory)
250 
252  global Farm_Directories
253  global Jobs_Count
254  global Jobs_Index
255  global Path_Shell
256  global Path_Log
257 
258  Jobs_Index = "%04i_" % Jobs_Count
259  if Jobs_Count==0 and (Argv[0]=="ROOT" or Argv[0]=="FWLITE"):
260  #First Need to Compile the macro --> Create a temporary shell path with no arguments
261  print("Compiling the Macro...")
262  CreateTheShellFile([Argv[0],Argv[1]])
263  os.system('sh '+Path_Shell)
264  os.system('rm '+Path_Shell)
265  print("Getting the jobs...")
266  print(Argv)
267  CreateTheShellFile(Argv)
269  Jobs_Count = Jobs_Count+1
270 
272  global useLSF
273  global CopyRights
274  global Jobs_Count
275  global Path_Cmd
276 
277  if useLSF:
278  os.system("sh " + Path_Cmd)
279  else:
280  os.system("condor_submit " + Path_Cmd)
281 
282  print('\n'+CopyRights)
283  print('%i Job(s) has/have been submitted on the Computing Cluster' % Jobs_Count)
284 
285 def SendSingleJob(FarmDirectory, JobName, Argv):
286  SendCluster_Create(FarmDirectory, JobName, Argv)
287  SendCluster_Push(FarmDirectory, JobName, Argv)
288  SendCluster_Submit(FarmDirectory, JobName,Argv)
289 
290 def SendCMSJobs(FarmDirectory, JobName, ConfigFile, InputFiles, NJobs, Argv):
291  SendCluster_Create(FarmDirectory, JobName)
292  NJobs = SendCluster_LoadInputFiles(InputFiles, NJobs)
293  for i in range(NJobs):
294  LaunchOnCondor.SendCluster_Push (["CMSSW", ConfigFile])
296 
297 
298 
299 def GetListOfFiles(Prefix, InputPattern, Suffix):
300  List = []
301 
302  if(InputPattern.find('/store/cmst3')==0) :
303  index = InputPattern.rfind('/')
304  Listtmp = commands.getstatusoutput('cmsLs ' + InputPattern[0:index] + ' | awk \'{print $5}\'')[1].split('\n')
305  pattern = InputPattern[index+1:len(InputPattern)]
306  for file in Listtmp:
307  if fnmatch.fnmatch(file, pattern): List.append(InputPattern[0:index]+'/'+file)
308  elif(InputPattern.find('/castor/')==0):
309  index = InputPattern.rfind('/')
310  Listtmp = commands.getstatusoutput('rfdir ' + InputPattern[0:index] + ' | awk \'{print $9}\'')[1].split('\n')
311  pattern = InputPattern[index+1:len(InputPattern)]
312  for file in Listtmp:
313  if fnmatch.fnmatch(file, pattern): List.append(InputPattern[0:index]+'/'+file)
314  else :
315  List = glob.glob(InputPattern)
316 
317  List = sorted(List)
318  for i in range(len(List)):
319  List[i] = Prefix + List[i] + Suffix
320  return List
321 
322 
323 def ListToString(InputList):
324  outString = ""
325  for i in range(len(InputList)):
326  outString += InputList[i]
327  return outString
328 
329 def ListToFile(InputList, outputFile):
330  out_file=open(outputFile,'w')
331  for i in range(len(InputList)):
332  out_file.write(' ' + InputList[i] + '\n')
333  out_file.close()
334 
335 def FileToList(path):
336  input_file = open(path,'r')
337  input_lines = input_file.readlines()
338  input_file.close()
339  input_lines.sort()
340  return input_lines
341 
342 
343 def SendCMSMergeJob(FarmDirectory, JobName, InputFiles, OutputFile, KeepStatement):
344  SendCluster_Create(FarmDirectory, JobName)
345  Temp_Cfg = Farm_Directories[1]+Jobs_Index+Jobs_Name+'_TEMP_cfg.py'
346 
347  if len(InputFiles)==0:
348  print('Empty InputFile List for Job named "%s", Job will not be submitted' % JobName)
349  return
350 
351  InputFilesString = ""
352  for i in range(len(InputFiles)):
353  InputFilesString += " " + InputFiles[i] + '\n'
354 
355  cfg_file=open(Temp_Cfg,'w')
356  cfg_file.write('import FWCore.ParameterSet.Config as cms\n')
357  cfg_file.write('process = cms.Process("Merge")\n')
358  cfg_file.write('\n')
359  cfg_file.write('process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )\n')
360  cfg_file.write('process.load("FWCore.MessageService.MessageLogger_cfi")\n')
361  cfg_file.write('\n')
362  cfg_file.write('process.MessageLogger.cerr.FwkReport.reportEvery = 50000\n')
363  cfg_file.write('process.source = cms.Source("PoolSource",\n')
364  cfg_file.write(' fileNames = cms.untracked.vstring(\n')
365  cfg_file.write('%s' % InputFilesString)
366  cfg_file.write(' )\n')
367  cfg_file.write(')\n')
368  cfg_file.write('\n')
369  cfg_file.write('process.OUT = cms.OutputModule("PoolOutputModule",\n')
370  cfg_file.write(' outputCommands = cms.untracked.vstring(%s),\n' % KeepStatement)
371  cfg_file.write(' fileName = cms.untracked.string(%s)\n' % OutputFile)
372  cfg_file.write(')\n')
373  cfg_file.write('\n')
374  cfg_file.write('process.endPath = cms.EndPath(process.OUT)\n')
375  cfg_file.close()
376  SendCluster_Push (["CMSSW", Temp_Cfg])
378  os.system('rm '+ Temp_Cfg)
379 
LaunchOnCondor.SendCluster_Create
def SendCluster_Create(FarmDirectory, JobName)
Definition: LaunchOnCondor.py:232
LaunchOnCondor.SendCluster_Submit
def SendCluster_Submit()
Definition: LaunchOnCondor.py:271
FastTimerService_cff.range
range
Definition: FastTimerService_cff.py:34
LaunchOnCondor.SendCMSJobs
def SendCMSJobs(FarmDirectory, JobName, ConfigFile, InputFiles, NJobs, Argv)
Definition: LaunchOnCondor.py:290
LaunchOnCondor.CreateTheConfigFile
def CreateTheConfigFile(argv)
Definition: LaunchOnCondor.py:38
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
str
#define str(s)
Definition: TestProcessor.cc:48
LaunchOnCondor.FileToList
def FileToList(path)
Definition: LaunchOnCondor.py:335
LaunchOnCondor.SendSingleJob
def SendSingleJob(FarmDirectory, JobName, Argv)
Definition: LaunchOnCondor.py:285
LaunchOnCondor.ListToFile
def ListToFile(InputList, outputFile)
Definition: LaunchOnCondor.py:329
LaunchOnCondor.AddJobToCmdFile
def AddJobToCmdFile()
Definition: LaunchOnCondor.py:177
LaunchOnCondor.SendCluster_Push
def SendCluster_Push(Argv)
Definition: LaunchOnCondor.py:251
LaunchOnCondor.CreateDirectoryStructure
def CreateDirectoryStructure(FarmDirectory)
Definition: LaunchOnCondor.py:202
LaunchOnCondor.CreateTheCmdFile
def CreateTheCmdFile()
Definition: LaunchOnCondor.py:155
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
LaunchOnCondor.SendCluster_LoadInputFiles
def SendCluster_LoadInputFiles(path, NJobs)
Definition: LaunchOnCondor.py:210
LaunchOnCondor.GetListOfFiles
def GetListOfFiles(Prefix, InputPattern, Suffix)
Definition: LaunchOnCondor.py:299
LaunchOnCondor.SendCMSMergeJob
def SendCMSMergeJob(FarmDirectory, JobName, InputFiles, OutputFile, KeepStatement)
Definition: LaunchOnCondor.py:343
LaunchOnCondor.CreateTheShellFile
def CreateTheShellFile(argv)
Definition: LaunchOnCondor.py:75
LaunchOnCondor.ListToString
def ListToString(InputList)
Definition: LaunchOnCondor.py:323