9 import multiprocessing
as mp
10 from collections
import OrderedDict
12 from CondCore.CondHDF5ESSource.hdf5Writer
import writeH5File
13 import CondCore.Utilities.conddblib
as conddb
31 '''To be used inside filter(). 39 return sqlalchemy.literal(
True) == sqlalchemy.literal(
True)
45 return datetime.datetime.strptime(timestamp,
'%Y-%m-%d %H:%M:%S.%f')
50 return datetime.datetime.strptime(timestamp,
'%Y-%m-%d %H:%M:%S')
55 return datetime.datetime.strptime(timestamp,
'%Y-%m-%d')
59 raise Exception(
"Could not parse timestamp '%s'" % timestamp)
64 ret = session.query(primary_key).\
65 filter(primary_key == value).\
67 except sqlalchemy.exc.OperationalError:
72 def _connect(db, init, read_only, args, as_admin=False):
74 logging.debug(
'Preparing connection to %s ...', db)
76 url = conddb.make_url( db, read_only)
78 if url.drivername ==
'oracle+frontier':
79 ws = url.host.rsplit(
'%2F')
81 pretty_url =
'frontier://%s/%s' %(ws[-1],url.database)
82 connTo =
'%s [%s]' %(db,pretty_url)
83 logging.info(
'Connecting to %s', connTo)
84 logging.debug(
'DB url: %s',url)
86 if args.verbose
is not None:
87 verbose = args.verbose - 1
88 connection = conddb.connect(url, args.authPath, verbose, as_admin)
92 if connection.is_read_only:
93 raise Exception(
'Impossible to edit a read-only database.')
95 if connection.is_official:
98 logging.warning(
'You are going to edit an official database. If you are not one of the Offline DB experts but have access to the password for other reasons, please stop now.')
100 raise Exception(
'Editing official databases is forbidden. Use the official DropBox to upload conditions. If you need a special intervention on the database, see the contact help: %s' % conddb.contact_help)
102 if url.drivername ==
'sqlite':
105 if not connection._is_valid:
106 raise Exception(
'No valid schema found in the database.')
111 def connect(args, init=False, read_only=True, as_admin=False):
112 args.force = args.force
if 'force' in dir(args)
else False 115 if args.destdb
is None:
116 args.destdb = args.db
117 if args.db == args.destdb:
118 conn1 =
_connect(args.destdb, init, read_only, args)
120 conn1 =
_connect( args.db, init,
True, args)
121 conn2url = conddb.make_url(args.destdb,
False)
122 if conn2url.drivername ==
'sqlite' and not os.path.exists(args.destdb):
124 conn2 =
_connect(args.destdb, init,
False, args)
127 return _connect( args.db, init, read_only, args, as_admin)
134 return int(n) & 0xffffffff
141 Payload = session.get_dbtype(conddb.Payload)
142 table = session.query(Payload.hash, Payload.object_type, Payload.data).\
143 filter(Payload.hash.in_(payloads)).order_by(Payload.hash).
all()
148 session = connection.session()
170 raise StopIteration()
185 if len(data) < 1000000:
205 def __init__(self, ctype, label, payloadHashes, args):
219 def __init__(self, session, args, record, productNtags):
234 return self.
_dbtags[0]+
"@joined" 236 if self.
_type is None:
252 finalIOV = [ [i[0],[i[1]]]
for i
in iovAndPayload]
254 finalIOV =
mergeIOVs(finalIOV, iovAndPayload)
256 firstValues, lastValues =
sinceToIOV( (x[0]
for x
in finalIOV), time_type)
266 payloadForProducts = []
268 payloadForProducts.append(OrderedDict())
269 for first,last,payloads
in iovs:
270 for i,p
in enumerate(payloads):
272 payloadForProducts[i][p]=
None 285 for rcd, label, tag
in gt:
287 if lastRcd
is not None:
291 tags.append((label,tag))
292 if lastRcd
is not None:
298 if time_type == conddb.TimeType.Time.value:
300 if time_type == conddb.TimeType.Run.value
or time_type == conddb.TimeType.Lumi.value:
302 raise RuntimeError(
"unknown since time %s:"%
str(time_type))
307 if time_type == conddb.TimeType.Time.value:
309 if time_type == conddb.TimeType.Run.value:
310 return (
_high(since), 0)
311 if time_type == conddb.TimeType.Lumi.value:
315 if syncValue[1] == 0:
316 return (syncValue[0]-1, 0xffffffff)
317 return (syncValue[0], syncValue[1]-1)
322 for since
in sinceList:
324 firstValues.append(syncValue)
325 if len(firstValues) != 1:
327 lastValues.append((0xFFFFFFFF,0xFFFFFFFF))
328 return [firstValues,lastValues]
331 GlobalTag = session.get_dbtype(conddb.GlobalTag)
332 GlobalTagMap = session.get_dbtype(conddb.GlobalTagMap)
334 is_global_tag =
_exists(session, GlobalTag.name, name)
336 return session.query(GlobalTagMap.record, GlobalTagMap.label, GlobalTagMap.tag_name).\
337 filter(GlobalTagMap.global_tag_name == name).\
338 order_by(GlobalTagMap.record, GlobalTagMap.label).\
340 except sqlalchemy.exc.OperationalError:
341 sys.stderr.write(
"No table for GlobalTags found in DB.\n\n")
345 Tag = session.get_dbtype(conddb.Tag)
346 IOV = session.get_dbtype(conddb.IOV)
347 is_tag =
_exists(session, Tag.name, name)
349 time_type = session.query(Tag.time_type).\
350 filter(Tag.name == name).\
353 rawTagInfo = session.query(IOV.since, IOV.insertion_time, IOV.payload_hash).\
355 IOV.tag_name == name,
358 order_by(IOV.since.desc(), IOV.insertion_time.desc()).\
360 order_by(IOV.since, IOV.insertion_time).\
364 for since,insertion,payload
in rawTagInfo:
365 if lastSince == since:
368 if time_type == conddb.TimeType.Run.value:
371 since =
int(since) << 32
372 filteredTagInfo.append((since,payload))
374 if time_type == conddb.TimeType.Run.value:
375 time_type = conddb.TimeType.Lumi.value
377 return time_type, filteredTagInfo
382 def _checkMerge(previousIOV, newIOV, debugCopy, nExistingDataProducts):
386 for i,e
in enumerate(previousIOV):
387 if len(e[1]) != nExistingDataProducts+1:
388 raise RuntimeError(
"entry %i has wrong number of elements %i instead of %i"%(i,len(e[1]),nExistingDataProducts+1))
389 if previousSince >= e[0]:
391 raise RuntimeError(
"IOV not in order for index %i"%i)
396 while debugIndex < len(debugCopy)
and previousIndex < len(previousIOV):
397 previousSince = previousIOV[previousIndex][0]
398 debugSince = debugCopy[debugIndex][0]
402 if debugSince != previousSince:
405 if debugCopy[debugIndex][1] != previousIOV[previousIndex][1][:nExistingDataProducts]:
406 raise RuntimeError(
"packaged were not properly copied for index %i original:%s new:%s"%(debugIndex,
",".
join(debugCopy[debugIndex][1]),
",".
join(previousIOV[previousIndex][1][:nExistingDataProducts])))
409 if debugIndex != len(debugCopy):
410 raise RuntimeError(
"failed to copy forward index %i"%debugIndex)
413 while newIndex < len(newIOV)
and previousIndex < len(previousIOV):
414 previousSince = previousIOV[previousIndex][0]
415 newSince = newIOV[newIndex][0]
416 if newSince != previousSince:
419 if previousIOV[previousIndex][1][-1] != newIOV[newIndex][1]:
420 raise RuntimeError(
"failed to append package at index %i"%newIndex)
423 if newIndex != len(newIOV):
424 raise RuntimeError(
"failed to merge IOV entry %i"%newIndex)
428 debugCopy = copy.deepcopy(previousIOV)
429 previousSize = len(previousIOV)
430 newSize = len(newIOV)
433 nExistingDataProducts = len(previousIOV[0][1])
434 while newIndex < newSize
and previousIndex < previousSize:
436 previousSince = previousIOV[previousIndex][0]
437 newSince = newIOV[newIndex][0]
438 if previousSince == newSince:
439 previousIOV[previousIndex][1].
append(newIOV[newIndex][1])
443 elif newSince < previousSince:
444 if previousIndex == 0:
445 payloads = [
None]*nExistingDataProducts
446 payloads.append(newIOV[newIndex][1])
447 previousIOV.insert(0,[newSince,payloads])
449 payloads = previousIOV[previousIndex-1][1][:nExistingDataProducts]
450 payloads.append(newIOV[newIndex][1])
451 previousIOV.insert(previousIndex,[newSince,payloads])
455 elif newSince > previousSince:
457 previousIOV[previousIndex][1].
append(
None)
459 if len(previousIOV[previousIndex][1]) == nExistingDataProducts:
460 previousIOV[previousIndex][1].
append(newIOV[newIndex-1][1])
462 if newIndex != newSize:
465 previousPayloads = previousIOV[-1][1]
466 while newIndex != newSize:
467 newPayloads = previousPayloads[:]
468 newPayloads[nExistingDataProducts] = newIOV[newIndex][1]
469 previousIOV.append([newIOV[newIndex][0], newPayloads])
471 if previousIndex != previousSize:
473 while previousIndex < previousSize:
474 previousIOV[previousIndex][1].
append(newIOV[-1][1])
476 _checkMerge(previousIOV, newIOV, debugCopy, nExistingDataProducts)
479 def writeTagImpl(tagsGroup, name, recName, time_type, IOV_payloads, payloadToRefs, originalTagNames):
480 tagGroup = tagsGroup.create_group(name)
481 tagGroup.attrs[
"time_type"] = time_type.encode(
"ascii")
482 tagGroup.attrs[
"db_tags"] = [x.encode(
"ascii")
for x
in originalTagNames]
483 tagGroup.attrs[
"record"] = recName.encode(
"ascii")
484 firstValues = [x[0]
for x
in IOV_payloads]
485 lastValues = [x[1]
for x
in IOV_payloads]
486 syncValueType = np.dtype([(
"high", np.uint32),(
"low", np.uint32)])
487 first_np = np.empty(shape=(len(IOV_payloads),), dtype=syncValueType)
488 first_np[
'high'] = [ x.high
for x
in firstValues]
489 first_np[
'low'] = [ x.low
for x
in firstValues]
490 last_np = np.empty(shape=(len(lastValues),), dtype=syncValueType)
491 last_np[
'high'] = [ x.high
for x
in lastValues]
492 last_np[
'low'] = [ x.low
for x
in lastValues]
495 payloads = [ [ payloadToRefs[y]
for y
in x[2]]
for x
in IOV_payloads]
497 if len(first_np) > 100:
499 tagGroup.create_dataset(
"first",data=first_np, compression = compressor)
500 tagGroup.create_dataset(
"last",data=last_np, compression = compressor)
501 tagGroup.create_dataset(
"payload", data=payloads, dtype=h5py.ref_dtype, compression = compressor)
505 def writeTag(tagsGroup, time_type, IOV_payloads, payloadToRefs, originalTagNames, recName):
506 name = originalTagNames[0]
507 if len(originalTagNames) != 1:
508 name = name+
"@joined" 509 return writeTagImpl(tagsGroup, name, recName, time_type, IOV_payloads, payloadToRefs, originalTagNames)
514 return subprocess.run([
"condRecordToDataProduct",record], capture_output =
True, check=
True, text=
True).stdout
516 __typedefs = {b
"ESCondObjectContainer<ESPedestal>":
"ESPedestals",
517 b
"ESCondObjectContainer<float>":
"ESFloatCondObjectContainer",
518 b
"ESCondObjectContainer<ESChannelStatusCode>":
"ESChannelStatus",
519 b
"EcalCondObjectContainer<EcalPedestal>":
"EcalPedestals",
520 b
"EcalCondObjectContainer<EcalXtalGroupId>":
"EcalWeightXtalGroups",
521 b
"EcalCondObjectContainer<EcalMGPAGainRatio>":
"EcalGainRatios",
522 b
"EcalCondObjectContainer<float>":
"EcalFloatCondObjectContainer",
523 b
"EcalCondObjectContainer<EcalChannelStatusCode>":
"EcalChannelStatus",
524 b
"EcalCondObjectContainer<EcalMappingElement>":
"EcalMappingElectronics",
525 b
"EcalCondObjectContainer<EcalTPGPedestal>":
"EcalTPGPedestals",
526 b
"EcalCondObjectContainer<EcalTPGLinearizationConstant>":
"EcalTPGLinearizationConst",
527 b
"EcalCondObjectContainer<EcalTPGCrystalStatusCode>":
"EcalTPGCrystalStatus",
528 b
"EcalCondTowerObjectContainer<EcalChannelStatusCode>":
"EcalDCSTowerStatus",
529 b
"EcalCondTowerObjectContainer<EcalDAQStatusCode>":
"EcalDAQTowerStatus",
530 b
"EcalCondObjectContainer<EcalDQMStatusCode>":
"EcalDQMChannelStatus",
531 b
"EcalCondTowerObjectContainer<EcalDQMStatusCode>":
"EcalDQMTowerStatus",
532 b
"EcalCondObjectContainer<EcalPulseShape>":
"EcalPulseShapes",
533 b
"EcalCondObjectContainer<EcalPulseCovariance>":
"EcalPulseCovariances",
534 b
"EcalCondObjectContainer<EcalPulseSymmCovariance>":
"EcalPulseSymmCovariances",
535 b
"HcalItemCollById<HFPhase1PMTData>":
"HFPhase1PMTParams",
536 b
"l1t::CaloParams":
"CaloParams",
537 b
"StorableDoubleMap<AbsOOTPileupCorrection>":
"OOTPileupCorrectionMapColl",
538 b
"PhysicsTools::Calibration::Histogram3D<double,double,double,double>":
"PhysicsTools::Calibration::HistogramD3D",
539 b
"PhysicsTools::Calibration::MVAComputerContainer":
"MVAComputerContainer" 542 return __typedefs.get(product,product)
545 parser = argparse.ArgumentParser(description=
'Read from CMS Condition DB and write to HDF5 file')
546 parser.add_argument(
'--db',
'-d', default=
'pro', help=
'Database to run the command on. Run the help subcommand for more information: conddb help')
547 parser.add_argument(
'name', nargs=
'+', help=
"Name of the global tag.")
548 parser.add_argument(
'--verbose',
'-v', action=
'count', help=
'Verbosity level. -v prints debugging information of this tool, like tracebacks in case of errors. -vv prints, in addition, all SQL statements issued. -vvv prints, in addition, all results returned by queries.')
549 parser.add_argument(
'--authPath',
'-a', default=
None, help=
'Path of the authentication .netrc file. Default: the content of the COND_AUTH_PATH environment variable, when specified.')
550 parser.add_argument(
'--snapshot',
'-T', default=
None, help=
"Snapshot time. If provided, the output will represent the state of the IOVs inserted into database up to the given time. The format of the string must be one of the following: '2013-01-20', '2013-01-20 10:11:12' or '2013-01-20 10:11:12.123123'.")
551 parser.add_argument(
'--exclude',
'-e', nargs=
'*', help =
'list of records to exclude from the file (can not be used with --include)')
552 parser.add_argument(
'--include',
'-i', nargs=
'*', help =
'lost of the only records that should be included in the file (can not be used with --exclude')
553 parser.add_argument(
'--output',
'-o', default=
'test.h5cond', help=
'name of hdf5 output file to write')
554 parser.add_argument(
'--compressor',
'-c', default=
'zlib', choices =[
'zlib',
'lzma',
'none'], help=
"compress data using 'zlib', 'lzma' or 'none'")
555 args = parser.parse_args()
557 if args.exclude
and args.include:
558 print(
"Can not use --exclude and --include at the same time")
562 session = connection.session()
564 excludeRecords = set()
566 excludeRecords = set(args.exclude)
567 includeRecords = set()
569 includeRecords = set(args.include)
571 writeH5File(args.output, args.name, excludeRecords, includeRecords,
lambda x:
DBGlobalTag(args, session, x), args.compressor )
573 if __name__ ==
'__main__':
def external_process_get_payloads_objtype_data(queue, args, payloads)
def _parse_timestamp(timestamp)
def __init__(self, args, session, name)
def writeH5File(fileName, globalTags, excludeRecords, includeRecords, tagReader, compressorName)
def iovsNPayloadNames(self)
def __init__(self, ctype, label, payloadHashes, args)
def __init__(self, args, payloads)
def canonicalProductName(product)
def tagInfo(session, name, snapshot)
def __init__(self, session, args, record, productNtags)
def _inserted_before(_IOV, timestamp)
def sinceToIOV(sinceList, time_type)
def __init__(self, high, low)
def globalTagInfo(session, name)
def originalTagNames(self)
def parseSince(time_type, since)
def _connect(db, init, read_only, args, as_admin=False)
def mergeIOVs(previousIOV, newIOV)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def writeTag(tagsGroup, time_type, IOV_payloads, payloadToRefs, originalTagNames, recName)
static std::string join(char **cmd)
def timeTypeName(time_type)
def connect(args, init=False, read_only=True, as_admin=False)
def previousSyncValue(syncValue)
def _checkMerge(previousIOV, newIOV, debugCopy, nExistingDataProducts)
def writeTagImpl(tagsGroup, name, recName, time_type, IOV_payloads, payloadToRefs, originalTagNames)
def get_payloads_objtype_data(session, payloads)
def _exists(session, primary_key, value)
def __init__(self, hash_, type_, data)