CMS 3D CMS Logo

LaunchOnCondor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import urllib
4 import string
5 import os
6 import sys
7 import glob
8 import fnmatch
9 import commands
10 
11 CopyRights = '####################################\n'
12 CopyRights += '# LaunchOnFarm Script #\n'
13 CopyRights += '# Loic.quertenmont@cern.ch #\n'
14 CopyRights += '# April 2010 #\n'
15 CopyRights += '####################################\n'
16 
17 Farm_Directories = []
18 Path_Cmd = ''
19 Path_Shell = ''
20 Path_Log = ''
21 Path_Cfg = ''
22 Jobs_Count = 0
23 Jobs_Name = ''
24 Jobs_Index = ''
25 Jobs_Seed = 0
26 Jobs_NEvent =-1
27 Jobs_Skip = 0
28 Jobs_Queue = '8nh'
29 Jobs_Inputs = []
30 Jobs_FinalCmds = []
31 Jobs_RunHere = 0
32 
33 useLSF = True
34 LSFlog = True
35 runInteractively = False
36 
38  global Jobs_Name
39  global Jobs_Index
40  global Jobs_Count
41  global Jobs_Seed
42  global Jobs_Skip
43  global Jobs_NEvent
44  global Jobs_Inputs
45  global Jobs_FinalCmds
46  global Path_Cfg
47  global CopyRights
48  Path_Cfg = Farm_Directories[1]+Jobs_Index+Jobs_Name+'_cfg.py'
49 
50  config_file=open(argv[1],'r')
51  config_txt = '\n\n' + CopyRights + '\n\n'
52  config_txt += config_file.read()
53  config_file.close()
54  i = 2
55  while i < len(argv)-1:
56  config_txt = config_txt.replace(argv[i],argv[i+1])
57  i+=2
58 
59  #Default Replacements
60  config_txt = config_txt.replace("XXX_I_XXX" ,"%04i"%Jobs_Count)
61  config_txt = config_txt.replace("XXX_PATH_XXX" ,os.getcwd())
62  config_txt = config_txt.replace("XXX_OUTPUT_XXX" ,Jobs_Name)
63  config_txt = config_txt.replace("XXX_NAME_XXX" ,Jobs_Index+Jobs_Name)
64  config_txt = config_txt.replace("XXX_SEED_XXX" ,str(Jobs_Seed+Jobs_Count))
65  config_txt = config_txt.replace("XXX_NEVENTS_XXX" ,str(Jobs_NEvent))
66  config_txt = config_txt.replace("XXX_SKIP_XXX" ,str(Jobs_Skip))
67  if Jobs_Count < len(Jobs_Inputs):
68  config_txt = config_txt.replace("XXX_INPUT_XXX" ,Jobs_Inputs[Jobs_Count])
69 
70  config_file=open(Path_Cfg,'w')
71  config_file.write(config_txt)
72  config_file.close()
73 
75  global Path_Shell
76  global Path_Cfg
77  global CopyRights
78  global Jobs_RunHere
79  global Jobs_FinalCmds
80  Path_Shell = Farm_Directories[1]+Jobs_Index+Jobs_Name+'.sh'
81 
82  function_argument='('
83  for i in range(2,len(argv)):
84  function_argument+="%s" % argv[i]
85  if i != len(argv)-1:
86  function_argument+=', '
87  function_argument+=')'
88 
89  shell_file=open(Path_Shell,'w')
90  shell_file.write('#! /bin/sh\n')
91  shell_file.write(CopyRights + '\n')
92  shell_file.write('export SCRAM_ARCH='+os.getenv("SCRAM_ARCH","slc5_amd64_gcc462")+'\n')
93  shell_file.write('export BUILD_ARCH='+os.getenv("BUILD_ARCH","slc5_amd64_gcc462")+'\n')
94  shell_file.write('export VO_CMS_SW_DIR='+os.getenv("VO_CMS_SW_DIR","/nfs/soft/cms")+'\n')
95  #shell_file.write('source /nfs/soft/cms/cmsset_default.sh\n')
96  shell_file.write('cd ' + os.getcwd() + '\n')
97  shell_file.write('eval `scramv1 runtime -sh`\n')
98 
99  if argv[0]=='BASH':
100  if Jobs_RunHere==0:
101  shell_file.write('cd -\n')
102  shell_file.write(argv[1] + " %s\n" % function_argument)
103  elif argv[0]=='ROOT':
104  if Jobs_RunHere==0:
105  shell_file.write('cd -\n')
106  shell_file.write('root -l -b << EOF\n')
107  shell_file.write(' TString makeshared(gSystem->GetMakeSharedLib());\n')
108  shell_file.write(' TString dummy = makeshared.ReplaceAll("-W ", "-Wno-deprecated-declarations -Wno-deprecated -Wno-unused-local-typedefs ");\n')
109  shell_file.write(' TString dummy = makeshared.ReplaceAll("-Wshadow ", " -std=c++0x -D__USE_XOPEN2K8 ");\n')
110  shell_file.write(' cout << "Compilling with the following arguments: " << makeshared << endl;\n')
111  shell_file.write(' gSystem->SetMakeSharedLib(makeshared);\n')
112  shell_file.write(' gSystem->SetIncludePath( "-I$ROOFITSYS/include" );\n')
113  shell_file.write(' .x %s+' % argv[1] + function_argument + '\n')
114  shell_file.write(' .q\n')
115  shell_file.write('EOF\n\n')
116  elif argv[0]=='FWLITE':
117  if Jobs_RunHere==0:
118  shell_file.write('cd -\n')
119  shell_file.write('root -l -b << EOF\n')
120  shell_file.write(' TString makeshared(gSystem->GetMakeSharedLib());\n')
121  shell_file.write(' TString dummy = makeshared.ReplaceAll("-W ", "-Wno-deprecated-declarations -Wno-deprecated ");\n')
122  shell_file.write(' TString dummy = makeshared.ReplaceAll("-Wshadow ", " -std=c++0x -D__USE_XOPEN2K8 ");\n')
123  shell_file.write(' cout << "Compilling with the following arguments: " << makeshared << endl;\n')
124  shell_file.write(' gSystem->SetMakeSharedLib(makeshared);\n')
125  shell_file.write(' gSystem->SetIncludePath("-I$ROOFITSYS/include");\n')
126  shell_file.write(' gSystem->Load("libFWCoreFWLite");\n')
127  shell_file.write(' FWLiteEnabler::enable();\n')
128  shell_file.write(' gSystem->Load("libDataFormatsFWLite.so");\n')
129  shell_file.write(' gSystem->Load("libAnalysisDataFormatsSUSYBSMObjects.so");\n')
130  shell_file.write(' gSystem->Load("libDataFormatsVertexReco.so");\n')
131  shell_file.write(' gSystem->Load("libDataFormatsHepMCCandidate.so");\n')
132  shell_file.write(' gSystem->Load("libPhysicsToolsUtilities.so");\n')
133  shell_file.write(' gSystem->Load("libdcap.so");\n')
134  shell_file.write(' .x %s+' % argv[1] + function_argument + '\n')
135  shell_file.write(' .q\n')
136  shell_file.write('EOF\n\n')
137  elif argv[0]=='CMSSW':
138  CreateTheConfigFile(argv);
139  if Jobs_RunHere==0:
140  shell_file.write('cd -\n')
141  shell_file.write('cmsRun ' + os.getcwd() + '/'+Path_Cfg + '\n')
142  else:
143  print #Program to use is not specified... Guess it is bash command
144  shell_file.write('#Program to use is not specified... Guess it is bash command\n')
145  shell_file.write(argv[1] + " %s\n" % function_argument)
146 
147  for i in range(len(Jobs_FinalCmds)):
148  shell_file.write(Jobs_FinalCmds[i]+'\n')
149  shell_file.write('mv '+ Jobs_Name+'* '+os.getcwd()+'/'+Farm_Directories[3]+'\n')
150  shell_file.close()
151  os.system("chmod 777 "+Path_Shell)
152 
153 
155  global useLSF
156  global Path_Cmd
157  global CopyRights
158  Path_Cmd = Farm_Directories[1]+Jobs_Name+'.cmd'
159  cmd_file=open(Path_Cmd,'w')
160 
161  if useLSF:
162  cmd_file.write(CopyRights + '\n')
163  else:
164  cmd_file.write('Universe = vanilla\n')
165  cmd_file.write('Environment = CONDORJOBID=$(Process)\n')
166  cmd_file.write('notification = Error\n')
167  #code specific for louvain
168  if(commands.getstatusoutput("uname -n")[1].find("ucl.ac.be")!=-1):
169  cmd_file.write('requirements = (CMSFARM=?=True)&&(Memory > 200)\n')
170  else:
171  cmd_file.write('requirements = (Memory > 200)\n')
172  cmd_file.write('should_transfer_files = YES\n')
173  cmd_file.write('when_to_transfer_output = ON_EXIT\n')
174  cmd_file.close()
175 
177  global runInteractively
178  global useLSF
179  global Path_Shell
180  global Path_Cmd
181  global Path_Log
182  Path_Log = Farm_Directories[2]+Jobs_Index+Jobs_Name
183  cmd_file=open(Path_Cmd,'a')
184 
185  if runInteractively:
186  cmd_file.write("sh "+ os.getcwd() + "/"+Path_Shell + "\n")
187  elif useLSF:
188  if LSFlog :
189  cmd_file.write("bsub -q " + Jobs_Queue + " -J " + Jobs_Name+Jobs_Index + " '" + os.getcwd() + "/"+Path_Shell + " 0 ele'\n")
190  else :
191  cmd_file.write("bsub -o /dev/null -q " + Jobs_Queue + " -J " + Jobs_Name+Jobs_Index + " '" + os.getcwd() + "/"+Path_Shell + " 0 ele'\n")
192  else:
193  cmd_file.write('\n')
194  cmd_file.write('Executable = %s\n' % Path_Shell)
195  cmd_file.write('output = %s.out\n' % Path_Log)
196  cmd_file.write('error = %s.err\n' % Path_Log)
197  cmd_file.write('log = %s.log\n' % Path_Log)
198  cmd_file.write('Queue 1\n')
199  cmd_file.close()
200 
201 def CreateDirectoryStructure(FarmDirectory):
202  global Jobs_Name
203  global Farm_Directories
204  Farm_Directories = [FarmDirectory+'/', FarmDirectory+'/inputs/', FarmDirectory+'/logs/', FarmDirectory+'/outputs/']
205  for i in range(0,len(Farm_Directories)):
206  if os.path.isdir(Farm_Directories[i]) == False:
207  os.system('mkdir ' + Farm_Directories[i])
208 
209 def SendCluster_LoadInputFiles(path, NJobs):
210  global Jobs_Inputs
211  input_file = open(path,'r')
212  input_lines = input_file.readlines()
213  input_file.close()
214  #input_lines.sort()
215 
216  BlockSize = (len(input_lines)/NJobs)
217  LineIndex = 0
218  JobIndex = 0
219  BlockIndex = 0
220  Jobs_Inputs = [""]
221  while LineIndex < len(input_lines):
222  Jobs_Inputs[JobIndex] += input_lines[LineIndex]
223  LineIndex +=1
224  BlockIndex+=1
225  if BlockIndex>BlockSize:
226  BlockIndex = 0
227  JobIndex += 1
228  Jobs_Inputs.append("")
229  return JobIndex+1
230 
231 def SendCluster_Create(FarmDirectory, JobName):
232  global useLSF
233  global Jobs_Name
234  global Jobs_Count
235  global Farm_Directories
236  global runInteractively
237 
238  if(runInteractively): useLSF=True
239 
240  #determine if the submission system is LSF batch or condor
241  command_out = commands.getstatusoutput("bjobs")[1]
242  if(command_out.find("command not found")<0): useLSF = True
243  else: useLSF = False;
244 
245  Jobs_Name = JobName
246  Jobs_Count = 0
247  CreateDirectoryStructure(FarmDirectory)
249 
251  global Farm_Directories
252  global Jobs_Count
253  global Jobs_Index
254  global Path_Shell
255  global Path_Log
256 
257  Jobs_Index = "%04i_" % Jobs_Count
258  if Jobs_Count==0 and (Argv[0]=="ROOT" or Argv[0]=="FWLITE"):
259  #First Need to Compile the macro --> Create a temporary shell path with no arguments
260  print "Compiling the Macro..."
261  CreateTheShellFile([Argv[0],Argv[1]])
262  os.system('sh '+Path_Shell)
263  os.system('rm '+Path_Shell)
264  print "Getting the jobs..."
265  print Argv
266  CreateTheShellFile(Argv)
268  Jobs_Count = Jobs_Count+1
269 
271  global useLSF
272  global CopyRights
273  global Jobs_Count
274  global Path_Cmd
275 
276  if useLSF:
277  os.system("sh " + Path_Cmd)
278  else:
279  os.system("condor_submit " + Path_Cmd)
280 
281  print '\n'+CopyRights
282  print '%i Job(s) has/have been submitted on the Computing Cluster' % Jobs_Count
283 
284 def SendSingleJob(FarmDirectory, JobName, Argv):
285  SendCluster_Create(FarmDirectory, JobName, Argv)
286  SendCluster_Push(FarmDirectory, JobName, Argv)
287  SendCluster_Submit(FarmDirectory, JobName,Argv)
288 
289 def SendCMSJobs(FarmDirectory, JobName, ConfigFile, InputFiles, NJobs, Argv):
290  SendCluster_Create(FarmDirectory, JobName)
291  NJobs = SendCluster_LoadInputFiles(InputFiles, NJobs)
292  for i in range(NJobs):
293  LaunchOnCondor.SendCluster_Push (["CMSSW", ConfigFile])
295 
296 
297 
298 def GetListOfFiles(Prefix, InputPattern, Suffix):
299  List = []
300 
301  if(InputPattern.find('/store/cmst3')==0) :
302  index = InputPattern.rfind('/')
303  Listtmp = commands.getstatusoutput('cmsLs ' + InputPattern[0:index] + ' | awk \'{print $5}\'')[1].split('\n')
304  pattern = InputPattern[index+1:len(InputPattern)]
305  for file in Listtmp:
306  if fnmatch.fnmatch(file, pattern): List.append(InputPattern[0:index]+'/'+file)
307  elif(InputPattern.find('/castor/')==0):
308  index = InputPattern.rfind('/')
309  Listtmp = commands.getstatusoutput('rfdir ' + InputPattern[0:index] + ' | awk \'{print $9}\'')[1].split('\n')
310  pattern = InputPattern[index+1:len(InputPattern)]
311  for file in Listtmp:
312  if fnmatch.fnmatch(file, pattern): List.append(InputPattern[0:index]+'/'+file)
313  else :
314  List = glob.glob(InputPattern)
315 
316  List = sorted(List)
317  for i in range(len(List)):
318  List[i] = Prefix + List[i] + Suffix
319  return List
320 
321 
322 def ListToString(InputList):
323  outString = ""
324  for i in range(len(InputList)):
325  outString += InputList[i]
326  return outString
327 
328 def ListToFile(InputList, outputFile):
329  out_file=open(outputFile,'w')
330  for i in range(len(InputList)):
331  out_file.write(' ' + InputList[i] + '\n')
332  out_file.close()
333 
334 def FileToList(path):
335  input_file = open(path,'r')
336  input_lines = input_file.readlines()
337  input_file.close()
338  input_lines.sort()
339  return input_lines
340 
341 
342 def SendCMSMergeJob(FarmDirectory, JobName, InputFiles, OutputFile, KeepStatement):
343  SendCluster_Create(FarmDirectory, JobName)
344  Temp_Cfg = Farm_Directories[1]+Jobs_Index+Jobs_Name+'_TEMP_cfg.py'
345 
346  if len(InputFiles)==0:
347  print 'Empty InputFile List for Job named "%s", Job will not be submitted' % JobName
348  return
349 
350  InputFilesString = ""
351  for i in range(len(InputFiles)):
352  InputFilesString += " " + InputFiles[i] + '\n'
353 
354  cfg_file=open(Temp_Cfg,'w')
355  cfg_file.write('import FWCore.ParameterSet.Config as cms\n')
356  cfg_file.write('process = cms.Process("Merge")\n')
357  cfg_file.write('\n')
358  cfg_file.write('process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )\n')
359  cfg_file.write('process.load("FWCore.MessageService.MessageLogger_cfi")\n')
360  cfg_file.write('\n')
361  cfg_file.write('process.MessageLogger.cerr.FwkReport.reportEvery = 50000\n')
362  cfg_file.write('process.source = cms.Source("PoolSource",\n')
363  cfg_file.write(' fileNames = cms.untracked.vstring(\n')
364  cfg_file.write('%s' % InputFilesString)
365  cfg_file.write(' )\n')
366  cfg_file.write(')\n')
367  cfg_file.write('\n')
368  cfg_file.write('process.OUT = cms.OutputModule("PoolOutputModule",\n')
369  cfg_file.write(' outputCommands = cms.untracked.vstring(%s),\n' % KeepStatement)
370  cfg_file.write(' fileName = cms.untracked.string(%s)\n' % OutputFile)
371  cfg_file.write(')\n')
372  cfg_file.write('\n')
373  cfg_file.write('process.endPath = cms.EndPath(process.OUT)\n')
374  cfg_file.close()
375  SendCluster_Push (["CMSSW", Temp_Cfg])
377  os.system('rm '+ Temp_Cfg)
378 
def ListToString(InputList)
def SendCluster_LoadInputFiles(path, NJobs)
def SendCluster_Submit()
def CreateDirectoryStructure(FarmDirectory)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def FileToList(path)
def SendSingleJob(FarmDirectory, JobName, Argv)
def SendCluster_Push(Argv)
def SendCluster_Create(FarmDirectory, JobName)
def SendCMSJobs(FarmDirectory, JobName, ConfigFile, InputFiles, NJobs, Argv)
def CreateTheShellFile(argv)
if(dp >Float(M_PI)) dp-
def SendCMSMergeJob(FarmDirectory, JobName, InputFiles, OutputFile, KeepStatement)
def ListToFile(InputList, outputFile)
def GetListOfFiles(Prefix, InputPattern, Suffix)
def CreateTheConfigFile(argv)
double split
Definition: MVATrainer.cc:139