42 : m_logger(iConfig.getUntrackedParameter<
std::
string>(
"jobName",
"DBOutputService")),
45 m_transactionActive(
false),
46 m_dbInitialised(
false),
51 m_autoCommit = iConfig.getUntrackedParameter<
bool>(
"autoCommit",
true);
52 m_writeTransactionDelay = iConfig.getUntrackedParameter<
unsigned int>(
"writeTransactionDelay", 0);
54 m_connection.setParameters(connectionPset);
55 m_connection.setLogDestination(m_logger);
56 m_connection.configure();
59 bool saveLogsOnDb = iConfig.getUntrackedParameter<
bool>(
"saveLogsOnDB",
false);
64 typedef std::vector<edm::ParameterSet>
Parameters;
66 for (Parameters::iterator itToPut =
toPut.begin(); itToPut !=
toPut.end(); ++itToPut)
67 fillRecord(*itToPut, timetypestr);
99 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
100 doStartTransaction();
103 for (
auto& iR : m_records) {
104 if (iR.second.m_isNewTag ==
false) {
110 doCommitTransaction();
116 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
117 doStartTransaction();
120 for (
auto& iR : m_records) {
121 if (iR.second.m_isNewTag ==
false) {
127 doCommitTransaction();
137 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
138 bool doCommit =
false;
139 if (!m_transactionActive) {
140 m_session.transaction().start(
true);
143 bool dbexists =
false;
145 dbexists = initDB(
true);
150 m_session.transaction().commit();
153 auto& myrecord = this->lookUpRecord(
recordName);
154 return myrecord.m_isNewTag;
158 if (!m_transactionActive) {
159 m_session.transaction().start(
false);
160 m_transactionActive =
true;
165 if (m_transactionActive) {
166 if (m_writeTransactionDelay) {
167 m_logger.logWarning() <<
"Waiting " << m_writeTransactionDelay <<
"s before commit the changes...";
168 ::sleep(m_writeTransactionDelay);
170 m_session.transaction().commit();
171 m_transactionActive =
false;
176 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
177 doStartTransaction();
181 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
182 doCommitTransaction();
186 if (!m_dbInitialised) {
187 if (!m_session.existsDatabase()) {
190 m_session.createDatabase();
192 for (
auto& iR : m_records) {
193 if (m_session.existsIov(iR.second.m_tag)) {
194 iR.second.m_isNewTag =
false;
198 m_dbInitialised =
true;
200 return m_dbInitialised;
205 std::map<std::string, Record>::iterator it = m_records.find(
recordName);
206 if (it == m_records.end()) {
208 "PoolDBOutputService::getRecord");
230 for (
auto&
time : m_currentTimes) {
236 for (
auto&
time : m_currentTimes) {
244 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
245 doStartTransaction();
250 doCommitTransaction();
270 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
271 doStartTransaction();
276 if (!myrecord.m_isNewTag) {
277 cond::throwException(myrecord.m_tag +
" is not a new tag",
"PoolDBOutputService::createNewIOV");
279 m_logger.logInfo() <<
"Creating new tag " << myrecord.m_tag <<
", adding iov with since " << firstSinceTime
280 <<
" pointing to payload id " << firstPayloadId;
282 m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype,
cond::SYNCH_ANY);
284 editor.
insert(firstSinceTime, firstPayloadId);
287 myrecord.m_isNewTag =
false;
289 doCommitTransaction();
302 m_logger.logInfo() <<
"Creating new tag " << myrecord.
m_tag <<
" for payload type " <<
payloadType 303 <<
", adding iov with since " << firstSinceTime;
308 editor.
insert(firstSinceTime, firstPayloadId);
318 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
319 doStartTransaction();
322 bool dbexists = this->initDB();
326 auto& myrecord = this->lookUpRecord(
recordName);
327 if (myrecord.m_isNewTag) {
329 "PoolDBOutputService::appendSinceTime");
331 ret = appendSinceTime(payloadId,
time, myrecord);
333 doCommitTransaction();
346 m_logger.logInfo() <<
"Updating existing tag " << myrecord.
m_tag <<
", adding iov with since " <<
time;
361 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
362 doStartTransaction();
365 bool dbexists = this->initDB();
369 auto& myrecord = this->lookUpRecord(
recordName);
370 if (myrecord.m_isNewTag) {
372 "PoolDBOutputService::appendSinceTime");
374 m_logger.logInfo() <<
"Updating existing tag " << myrecord.m_tag <<
", removing iov with since " << sinceTime
375 <<
" pointing to payload id " << payloadId;
377 editor.
erase(sinceTime, payloadId);
381 doCommitTransaction();
391 std::map<std::string, Record>::const_iterator it = m_records.find(
recordName);
392 if (it == m_records.end()) {
394 "PoolDBOutputService::lookUpRecord");
400 std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(
recordName);
401 if (it == m_logheaders.end())
403 " from PoolDBOutputService::lookUpUserLogInfo");
408 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
409 doStartTransaction();
412 bool dbexists = this->initDB();
417 if (myrecord.m_isNewTag) {
419 "PoolDBOutputService::closeIOV");
421 m_logger.logInfo() <<
"Updating existing tag " << myrecord.m_tag <<
", closing with end of validity " << lastTill;
424 editor.
flush(
"Tag closed.");
426 doCommitTransaction();
446 m_logger.logDebug() <<
"Fetching tag info for " <<
record.m_tag;
449 if (m_session.existsIov(
record.m_tag)) {
459 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
461 bool doCommit =
false;
462 if (!m_transactionActive) {
463 m_session.transaction().start(
true);
466 bool dbexists =
false;
469 dbexists = initDB(
true);
477 m_session.transaction().commit();
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
T getParameter(std::string const &) const
Base exception class for the object to relational access.
unsigned int m_refreshTime
Timestamp const & timestamp() const
void preEventProcessing(edm::StreamContext const &)
void fillRecord(edm::ParameterSet &pset, const std::string &gTimeTypeStr)
const Record & lookUpRecord(const std::string &recordName)
ret
prodAgent to be discontinued
std::map< std::string, cond::UserLogInfo > m_logheaders
cond::Time_t beginOfTime() const
void doStartTransaction()
unsigned int maxNumberOfStreams() const
void setDescription(const std::string &description)
bool initDB(bool readOnly=false)
void preGlobalBeginLumi(edm::GlobalContext const &)
void throwException(const std::string &message, const std::string &methodName)
std::string tag(const std::string &recordName)
T getUntrackedParameter(std::string const &, T const &) const
void closeIOV(Time_t lastTill, const std::string &recordName)
unsigned long long Time_t
TimeType timeTypeFromName(const std::string &name)
bool isNewTagRequest(const std::string &recordName)
cond::Time_t currentTime() const
StreamID const & streamID() const
LuminosityBlockID const & luminosityBlockID() const
cond::Time_t endOfTime() const
void erase(cond::Time_t since, const cond::Hash &payloadHash)
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
std::map< std::string, Record > m_records
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
virtual ~PoolDBOutputService()
Record & getRecord(const std::string &recordName)
TimeValue_t value() const
void preGlobalBeginRun(edm::GlobalContext const &)
bool appendSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
bool m_onlyAppendUpdatePolicy
void createNewIOV(const std::string &firstPayloadId, cond::Time_t firstSinceTime, const std::string &recordName)
bool getTagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::vector< AlignmentParameters * > Parameters
static const std::string kSharedResource
cond::persistency::Session newReadOnlySession(const std::string &connectionString, const std::string &transactionId)
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
cond::TimeType m_timetype
unsigned int value() const
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool tagInfo(const std::string &recordName, cond::TagInfo_t &result)
void eraseSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
cond::persistency::Session session() const
void doCommitTransaction()