40 : m_logger(iConfig.getUntrackedParameter<std::
string>(
"jobName",
"DBOutputService")),
43 m_transactionActive(
false),
44 m_dbInitialised(
false),
49 m_autoCommit =
iConfig.getUntrackedParameter<
bool>(
"autoCommit",
true);
50 m_writeTransactionDelay =
iConfig.getUntrackedParameter<
unsigned int>(
"writeTransactionDelay", 0);
52 m_connection.setParameters(connectionPset);
53 m_connection.setLogDestination(m_logger);
54 m_connection.configure();
56 m_session = m_connection.createSession(connectionString,
true);
57 bool saveLogsOnDb =
iConfig.getUntrackedParameter<
bool>(
"saveLogsOnDB",
false);
59 m_logger.setDbDestination(connectionString);
62 typedef std::vector<edm::ParameterSet>
Parameters;
63 Parameters
toPut =
iConfig.getParameter<Parameters>(
"toPut");
64 for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
65 fillRecord(*itToPut, timetypestr);
90 ret = m_connection.createReadOnlySession(connectionString, transactionId);
97 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
101 for (
auto& iR : m_records) {
102 if (iR.second.m_isNewTag ==
false) {
108 doCommitTransaction();
114 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
115 doStartTransaction();
118 for (
auto& iR : m_records) {
119 if (iR.second.m_isNewTag ==
false) {
125 doCommitTransaction();
131 return this->lookUpRecord(recordName).m_tag;
135 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
136 bool doCommit =
false;
137 if (!m_transactionActive) {
138 m_session.transaction().start(
true);
141 bool dbexists =
false;
143 dbexists = initDB(
true);
148 m_session.transaction().commit();
151 auto& myrecord = this->lookUpRecord(recordName);
152 return myrecord.m_isNewTag;
156 if (!m_transactionActive) {
157 m_session.transaction().start(
false);
158 m_transactionActive =
true;
163 if (m_transactionActive) {
164 if (m_writeTransactionDelay) {
165 m_logger.logWarning() <<
"Waiting " << m_writeTransactionDelay <<
"s before commit the changes...";
166 ::sleep(m_writeTransactionDelay);
168 m_session.transaction().commit();
169 m_transactionActive =
false;
174 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
175 doStartTransaction();
179 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
180 doCommitTransaction();
184 if (!m_dbInitialised) {
185 if (!m_session.existsDatabase()) {
188 m_session.createDatabase();
190 for (
auto& iR : m_records) {
191 if (m_session.existsIov(iR.second.m_tag)) {
192 iR.second.m_isNewTag =
false;
196 m_dbInitialised =
true;
198 return m_dbInitialised;
203 std::map<std::string, Record>::iterator it = m_records.find(recordName);
204 if (it == m_records.end()) {
206 "PoolDBOutputService::getRecord");
228 for (
auto& time : m_currentTimes) {
234 for (
auto& time : m_currentTimes) {
242 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
243 doStartTransaction();
248 doCommitTransaction();
268 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
269 doStartTransaction();
273 auto& myrecord = this->getRecord(recordName);
274 if (!myrecord.m_isNewTag) {
275 cond::throwException(myrecord.m_tag +
" is not a new tag",
"PoolDBOutputService::createNewIOV");
277 m_logger.logInfo() <<
"Creating new tag " << myrecord.m_tag <<
", adding iov with since " << firstSinceTime
278 <<
" pointing to payload id " << firstPayloadId;
280 m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype,
cond::SYNCH_ANY);
282 editor.
insert(firstSinceTime, firstPayloadId);
285 myrecord.m_isNewTag =
false;
287 doCommitTransaction();
300 m_logger.logInfo() <<
"Creating new tag " << myrecord.
m_tag <<
" for payload type " << payloadType
301 <<
", adding iov with since " << firstSinceTime;
306 editor.
insert(firstSinceTime, firstPayloadId);
316 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
317 doStartTransaction();
320 bool dbexists = this->initDB();
324 auto& myrecord = this->lookUpRecord(recordName);
325 if (myrecord.m_isNewTag) {
327 "PoolDBOutputService::appendSinceTime");
329 ret = appendSinceTime(payloadId, time, myrecord);
331 doCommitTransaction();
344 m_logger.logInfo() <<
"Updating existing tag " << myrecord.
m_tag <<
", adding iov with since " << time;
347 editor.
insert(time, payloadId);
359 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
360 doStartTransaction();
363 bool dbexists = this->initDB();
367 auto& myrecord = this->lookUpRecord(recordName);
368 if (myrecord.m_isNewTag) {
370 "PoolDBOutputService::appendSinceTime");
372 m_logger.logInfo() <<
"Updating existing tag " << myrecord.m_tag <<
", removing iov with since " << sinceTime
373 <<
" pointing to payload id " << payloadId;
375 editor.
erase(sinceTime, payloadId);
379 doCommitTransaction();
389 std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
390 if (it == m_records.end()) {
392 "PoolDBOutputService::lookUpRecord");
398 std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
399 if (it == m_logheaders.end())
401 " from PoolDBOutputService::lookUpUserLogInfo");
406 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
407 doStartTransaction();
410 bool dbexists = this->initDB();
414 auto& myrecord = lookUpRecord(recordName);
415 if (myrecord.m_isNewTag) {
417 "PoolDBOutputService::closeIOV");
419 m_logger.logInfo() <<
"Updating existing tag " << myrecord.m_tag <<
", closing with end of validity " << lastTill;
422 editor.
flush(
"Tag closed.");
424 doCommitTransaction();
442 auto&
record = lookUpRecord(recordName);
444 m_logger.logDebug() <<
"Fetching tag info for " <<
record.m_tag;
447 if (m_session.existsIov(
record.m_tag)) {
457 std::lock_guard<std::recursive_mutex>
lock(m_mutex);
459 bool doCommit =
false;
460 if (!m_transactionActive) {
461 m_session.transaction().start(
true);
464 bool dbexists =
false;
467 dbexists = initDB(
true);
469 ret = getTagInfo(recordName, result);
475 m_session.transaction().commit();
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
T getUntrackedParameter(std::string const &, T const &) const
tuple ret
prodAgent to be discontinued
Base exception class for the object to relational access.
unsigned int m_refreshTime
void preEventProcessing(edm::StreamContext const &)
void fillRecord(edm::ParameterSet &pset, const std::string &gTimeTypeStr)
const Record & lookUpRecord(const std::string &recordName)
std::map< std::string, cond::UserLogInfo > m_logheaders
cond::persistency::Session session() const
void doStartTransaction()
LuminosityBlockID const & luminosityBlockID() const
void setDescription(const std::string &description)
bool initDB(bool readOnly=false)
void preGlobalBeginLumi(edm::GlobalContext const &)
void throwException(const std::string &message, const std::string &methodName)
std::string tag(const std::string &recordName)
unsigned int maxNumberOfStreams() const
void closeIOV(Time_t lastTill, const std::string &recordName)
unsigned long long Time_t
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
bool isNewTagRequest(const std::string &recordName)
cond::Time_t beginOfTime() const
void erase(cond::Time_t since, const cond::Hash &payloadHash)
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
StreamID const & streamID() const
std::map< std::string, Record > m_records
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
unsigned int value() const
virtual ~PoolDBOutputService()
Record & getRecord(const std::string &recordName)
T getParameter(std::string const &) const
cond::Time_t endOfTime() const
void preGlobalBeginRun(edm::GlobalContext const &)
bool appendSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
bool m_onlyAppendUpdatePolicy
void createNewIOV(const std::string &firstPayloadId, cond::Time_t firstSinceTime, const std::string &recordName)
bool getTagInfo(const std::string &recordName, cond::TagInfo_t &result)
cond::persistency::Session newReadOnlySession(const std::string &connectionString, const std::string &transactionId)
cond::Time_t currentTime() const
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
cond::TimeType m_timetype
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool tagInfo(const std::string &recordName, cond::TagInfo_t &result)
void eraseSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
TimeValue_t value() const
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
Timestamp const & timestamp() const
void doCommitTransaction()